telnet server
[folly.git] / folly / wangle / bootstrap / BootstrapTest.cpp
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include "folly/wangle/bootstrap/ServerBootstrap.h"
18 #include "folly/wangle/bootstrap/ClientBootstrap.h"
19 #include "folly/wangle/channel/Handler.h"
20
21 #include <glog/logging.h>
22 #include <gtest/gtest.h>
23 #include <boost/thread.hpp>
24
25 using namespace folly::wangle;
26 using namespace folly;
27
28 typedef Pipeline<IOBufQueue&, std::unique_ptr<IOBuf>> BytesPipeline;
29
30 typedef ServerBootstrap<BytesPipeline> TestServer;
31 typedef ClientBootstrap<BytesPipeline> TestClient;
32
33 class TestClientPipelineFactory : public PipelineFactory<BytesPipeline> {
34  public:
35   std::unique_ptr<BytesPipeline, folly::DelayedDestruction::Destructor>
36   newPipeline(std::shared_ptr<AsyncSocket> sock) {
37     CHECK(sock->good());
38
39     // We probably aren't connected immedately, check after a small delay
40     EventBaseManager::get()->getEventBase()->tryRunAfterDelay([sock](){
41       CHECK(sock->readable());
42     }, 100);
43     return nullptr;
44   }
45 };
46
47 class TestPipelineFactory : public PipelineFactory<BytesPipeline> {
48  public:
49   std::unique_ptr<BytesPipeline, folly::DelayedDestruction::Destructor> newPipeline(
50     std::shared_ptr<AsyncSocket> sock) {
51
52     pipelines++;
53     return std::unique_ptr<BytesPipeline, folly::DelayedDestruction::Destructor>(
54       new BytesPipeline());
55   }
56   std::atomic<int> pipelines{0};
57 };
58
59 class TestAcceptor : public Acceptor {
60 EventBase base_;
61  public:
62   TestAcceptor() : Acceptor(ServerSocketConfig()) {
63     Acceptor::init(nullptr, &base_);
64   }
65   void onNewConnection(
66       AsyncSocket::UniquePtr sock,
67       const folly::SocketAddress* address,
68       const std::string& nextProtocolName,
69         const TransportInfo& tinfo) {
70   }
71 };
72
73 class TestAcceptorFactory : public AcceptorFactory {
74  public:
75   std::shared_ptr<Acceptor> newAcceptor(EventBase* base) {
76     return std::make_shared<TestAcceptor>();
77   }
78 };
79
80 TEST(Bootstrap, Basic) {
81   TestServer server;
82   TestClient client;
83 }
84
85 TEST(Bootstrap, ServerWithPipeline) {
86   TestServer server;
87   server.childPipeline(std::make_shared<TestPipelineFactory>());
88   server.bind(0);
89   server.stop();
90 }
91
92 TEST(Bootstrap, ServerWithChildHandler) {
93   TestServer server;
94   server.childHandler(std::make_shared<TestAcceptorFactory>());
95   server.bind(0);
96   server.stop();
97 }
98
99 TEST(Bootstrap, ClientServerTest) {
100   TestServer server;
101   auto factory = std::make_shared<TestPipelineFactory>();
102   server.childPipeline(factory);
103   server.bind(0);
104   auto base = EventBaseManager::get()->getEventBase();
105
106   SocketAddress address;
107   server.getSockets()[0]->getAddress(&address);
108
109   TestClient client;
110   client.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
111   client.connect(address);
112   base->loop();
113   server.stop();
114
115   CHECK(factory->pipelines == 1);
116 }
117
118 TEST(Bootstrap, ClientConnectionManagerTest) {
119   // Create a single IO thread, and verify that
120   // client connections are pooled properly
121
122   TestServer server;
123   auto factory = std::make_shared<TestPipelineFactory>();
124   server.childPipeline(factory);
125   server.group(std::make_shared<IOThreadPoolExecutor>(1));
126   server.bind(0);
127   auto base = EventBaseManager::get()->getEventBase();
128
129   SocketAddress address;
130   server.getSockets()[0]->getAddress(&address);
131
132   TestClient client;
133   client.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
134
135   client.connect(address);
136
137   TestClient client2;
138   client2.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
139   client2.connect(address);
140
141   base->loop();
142   server.stop();
143
144   CHECK(factory->pipelines == 2);
145 }
146
147 TEST(Bootstrap, ServerAcceptGroupTest) {
148   // Verify that server is using the accept IO group
149
150   TestServer server;
151   auto factory = std::make_shared<TestPipelineFactory>();
152   server.childPipeline(factory);
153   server.group(std::make_shared<IOThreadPoolExecutor>(1), nullptr);
154   server.bind(0);
155
156   SocketAddress address;
157   server.getSockets()[0]->getAddress(&address);
158
159   boost::barrier barrier(2);
160   auto thread = std::thread([&](){
161     TestClient client;
162     client.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
163     client.connect(address);
164     EventBaseManager::get()->getEventBase()->loop();
165     barrier.wait();
166   });
167   barrier.wait();
168   server.stop();
169   thread.join();
170
171   CHECK(factory->pipelines == 1);
172 }
173
174 TEST(Bootstrap, ServerAcceptGroup2Test) {
175   // Verify that server is using the accept IO group
176
177   // Check if reuse port is supported, if not, don't run this test
178   try {
179     EventBase base;
180     auto serverSocket = AsyncServerSocket::newSocket(&base);
181     serverSocket->bind(0);
182     serverSocket->listen(0);
183     serverSocket->startAccepting();
184     serverSocket->setReusePortEnabled(true);
185     serverSocket->stopAccepting();
186   } catch(...) {
187     LOG(INFO) << "Reuse port probably not supported";
188     return;
189   }
190
191   TestServer server;
192   auto factory = std::make_shared<TestPipelineFactory>();
193   server.childPipeline(factory);
194   server.group(std::make_shared<IOThreadPoolExecutor>(4), nullptr);
195   server.bind(0);
196
197   SocketAddress address;
198   server.getSockets()[0]->getAddress(&address);
199
200   TestClient client;
201   client.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
202
203   client.connect(address);
204   EventBaseManager::get()->getEventBase()->loop();
205
206   server.stop();
207
208   CHECK(factory->pipelines == 1);
209 }
210
211 TEST(Bootstrap, SharedThreadPool) {
212   // Check if reuse port is supported, if not, don't run this test
213   try {
214     EventBase base;
215     auto serverSocket = AsyncServerSocket::newSocket(&base);
216     serverSocket->bind(0);
217     serverSocket->listen(0);
218     serverSocket->startAccepting();
219     serverSocket->setReusePortEnabled(true);
220     serverSocket->stopAccepting();
221   } catch(...) {
222     LOG(INFO) << "Reuse port probably not supported";
223     return;
224   }
225
226   auto pool = std::make_shared<IOThreadPoolExecutor>(2);
227
228   TestServer server;
229   auto factory = std::make_shared<TestPipelineFactory>();
230   server.childPipeline(factory);
231   server.group(pool, pool);
232
233   server.bind(0);
234
235   SocketAddress address;
236   server.getSockets()[0]->getAddress(&address);
237
238   TestClient client;
239   client.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
240   client.connect(address);
241
242   TestClient client2;
243   client2.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
244   client2.connect(address);
245
246   TestClient client3;
247   client3.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
248   client3.connect(address);
249
250   TestClient client4;
251   client4.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
252   client4.connect(address);
253
254   TestClient client5;
255   client5.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
256   client5.connect(address);
257
258   EventBaseManager::get()->getEventBase()->loop();
259
260   server.stop();
261   CHECK(factory->pipelines == 5);
262 }
263
264 TEST(Bootstrap, ExistingSocket) {
265   TestServer server;
266   auto factory = std::make_shared<TestPipelineFactory>();
267   server.childPipeline(factory);
268   folly::AsyncServerSocket::UniquePtr socket(new AsyncServerSocket);
269   server.bind(std::move(socket));
270 }
271
272 std::atomic<int> connections{0};
273
274 class TestHandlerPipeline : public InboundHandler<void*> {
275  public:
276   void read(Context* ctx, void* conn) {
277     connections++;
278     return ctx->fireRead(conn);
279   }
280 };
281
282 template <typename HandlerPipeline>
283 class TestHandlerPipelineFactory
284     : public PipelineFactory<ServerBootstrap<BytesPipeline>::AcceptPipeline> {
285  public:
286   std::unique_ptr<ServerBootstrap<BytesPipeline>::AcceptPipeline,
287                   folly::DelayedDestruction::Destructor>
288   newPipeline(std::shared_ptr<AsyncSocket>) {
289
290     std::unique_ptr<ServerBootstrap<BytesPipeline>::AcceptPipeline,
291                     folly::DelayedDestruction::Destructor> pipeline(
292                       new ServerBootstrap<BytesPipeline>::AcceptPipeline);
293     pipeline->addBack(HandlerPipeline());
294     return pipeline;
295   }
296 };
297
298 TEST(Bootstrap, LoadBalanceHandler) {
299   TestServer server;
300   auto factory = std::make_shared<TestPipelineFactory>();
301   server.childPipeline(factory);
302
303   auto pipelinefactory =
304     std::make_shared<TestHandlerPipelineFactory<TestHandlerPipeline>>();
305   server.pipeline(pipelinefactory);
306   server.bind(0);
307   auto base = EventBaseManager::get()->getEventBase();
308
309   SocketAddress address;
310   server.getSockets()[0]->getAddress(&address);
311
312   TestClient client;
313   client.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
314   client.connect(address);
315   base->loop();
316   server.stop();
317
318   CHECK(factory->pipelines == 1);
319   CHECK(connections == 1);
320 }
321
322 class TestUDPPipeline : public InboundHandler<void*> {
323  public:
324   void read(Context* ctx, void* conn) {
325     connections++;
326   }
327 };
328
329 TEST(Bootstrap, UDP) {
330   TestServer server;
331   auto factory = std::make_shared<TestPipelineFactory>();
332   auto pipelinefactory =
333     std::make_shared<TestHandlerPipelineFactory<TestUDPPipeline>>();
334   server.pipeline(pipelinefactory);
335   server.channelFactory(std::make_shared<AsyncUDPServerSocketFactory>());
336   server.bind(0);
337 }
338
339 TEST(Bootstrap, UDPClientServerTest) {
340   connections = 0;
341
342   TestServer server;
343   auto factory = std::make_shared<TestPipelineFactory>();
344   auto pipelinefactory =
345     std::make_shared<TestHandlerPipelineFactory<TestUDPPipeline>>();
346   server.pipeline(pipelinefactory);
347   server.channelFactory(std::make_shared<AsyncUDPServerSocketFactory>());
348   server.bind(0);
349
350   auto base = EventBaseManager::get()->getEventBase();
351
352   SocketAddress address;
353   server.getSockets()[0]->getAddress(&address);
354
355   SocketAddress localhost("::1", 0);
356   AsyncUDPSocket client(base);
357   client.bind(localhost);
358   auto data = IOBuf::create(1);
359   data->append(1);
360   *(data->writableData()) = 'a';
361   client.write(address, std::move(data));
362   base->loop();
363   server.stop();
364
365   CHECK(connections == 1);
366 }