2 * Copyright 2015 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #include "folly/wangle/bootstrap/ServerBootstrap.h"
18 #include "folly/wangle/bootstrap/ClientBootstrap.h"
19 #include "folly/wangle/channel/Handler.h"
21 #include <glog/logging.h>
22 #include <gtest/gtest.h>
23 #include <boost/thread.hpp>
25 using namespace folly::wangle;
26 using namespace folly;
28 typedef Pipeline<IOBufQueue&, std::unique_ptr<IOBuf>> BytesPipeline;
30 typedef ServerBootstrap<BytesPipeline> TestServer;
31 typedef ClientBootstrap<BytesPipeline> TestClient;
33 class TestClientPipelineFactory : public PipelineFactory<BytesPipeline> {
35 std::unique_ptr<BytesPipeline, folly::DelayedDestruction::Destructor>
36 newPipeline(std::shared_ptr<AsyncSocket> sock) {
37 // We probably aren't connected immedately, check after a small delay
38 EventBaseManager::get()->getEventBase()->tryRunAfterDelay([sock](){
40 CHECK(sock->readable());
46 class TestPipelineFactory : public PipelineFactory<BytesPipeline> {
48 std::unique_ptr<BytesPipeline, folly::DelayedDestruction::Destructor> newPipeline(
49 std::shared_ptr<AsyncSocket> sock) {
52 return std::unique_ptr<BytesPipeline, folly::DelayedDestruction::Destructor>(
55 std::atomic<int> pipelines{0};
58 class TestAcceptor : public Acceptor {
61 TestAcceptor() : Acceptor(ServerSocketConfig()) {
62 Acceptor::init(nullptr, &base_);
65 AsyncSocket::UniquePtr sock,
66 const folly::SocketAddress* address,
67 const std::string& nextProtocolName,
68 const TransportInfo& tinfo) {
72 class TestAcceptorFactory : public AcceptorFactory {
74 std::shared_ptr<Acceptor> newAcceptor(EventBase* base) {
75 return std::make_shared<TestAcceptor>();
79 TEST(Bootstrap, Basic) {
84 TEST(Bootstrap, ServerWithPipeline) {
86 server.childPipeline(std::make_shared<TestPipelineFactory>());
91 TEST(Bootstrap, ServerWithChildHandler) {
93 server.childHandler(std::make_shared<TestAcceptorFactory>());
98 TEST(Bootstrap, ClientServerTest) {
100 auto factory = std::make_shared<TestPipelineFactory>();
101 server.childPipeline(factory);
103 auto base = EventBaseManager::get()->getEventBase();
105 SocketAddress address;
106 server.getSockets()[0]->getAddress(&address);
109 client.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
110 client.connect(address);
114 CHECK(factory->pipelines == 1);
117 TEST(Bootstrap, ClientConnectionManagerTest) {
118 // Create a single IO thread, and verify that
119 // client connections are pooled properly
122 auto factory = std::make_shared<TestPipelineFactory>();
123 server.childPipeline(factory);
124 server.group(std::make_shared<IOThreadPoolExecutor>(1));
126 auto base = EventBaseManager::get()->getEventBase();
128 SocketAddress address;
129 server.getSockets()[0]->getAddress(&address);
132 client.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
134 client.connect(address);
137 client2.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
138 client2.connect(address);
143 CHECK(factory->pipelines == 2);
146 TEST(Bootstrap, ServerAcceptGroupTest) {
147 // Verify that server is using the accept IO group
150 auto factory = std::make_shared<TestPipelineFactory>();
151 server.childPipeline(factory);
152 server.group(std::make_shared<IOThreadPoolExecutor>(1), nullptr);
155 SocketAddress address;
156 server.getSockets()[0]->getAddress(&address);
158 boost::barrier barrier(2);
159 auto thread = std::thread([&](){
161 client.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
162 client.connect(address);
163 EventBaseManager::get()->getEventBase()->loop();
170 CHECK(factory->pipelines == 1);
173 TEST(Bootstrap, ServerAcceptGroup2Test) {
174 // Verify that server is using the accept IO group
176 // Check if reuse port is supported, if not, don't run this test
179 auto serverSocket = AsyncServerSocket::newSocket(&base);
180 serverSocket->bind(0);
181 serverSocket->listen(0);
182 serverSocket->startAccepting();
183 serverSocket->setReusePortEnabled(true);
184 serverSocket->stopAccepting();
186 LOG(INFO) << "Reuse port probably not supported";
191 auto factory = std::make_shared<TestPipelineFactory>();
192 server.childPipeline(factory);
193 server.group(std::make_shared<IOThreadPoolExecutor>(4), nullptr);
196 SocketAddress address;
197 server.getSockets()[0]->getAddress(&address);
200 client.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
202 client.connect(address);
203 EventBaseManager::get()->getEventBase()->loop();
207 CHECK(factory->pipelines == 1);
210 TEST(Bootstrap, SharedThreadPool) {
211 // Check if reuse port is supported, if not, don't run this test
214 auto serverSocket = AsyncServerSocket::newSocket(&base);
215 serverSocket->bind(0);
216 serverSocket->listen(0);
217 serverSocket->startAccepting();
218 serverSocket->setReusePortEnabled(true);
219 serverSocket->stopAccepting();
221 LOG(INFO) << "Reuse port probably not supported";
225 auto pool = std::make_shared<IOThreadPoolExecutor>(2);
228 auto factory = std::make_shared<TestPipelineFactory>();
229 server.childPipeline(factory);
230 server.group(pool, pool);
234 SocketAddress address;
235 server.getSockets()[0]->getAddress(&address);
238 client.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
239 client.connect(address);
242 client2.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
243 client2.connect(address);
246 client3.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
247 client3.connect(address);
250 client4.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
251 client4.connect(address);
254 client5.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
255 client5.connect(address);
257 EventBaseManager::get()->getEventBase()->loop();
260 CHECK(factory->pipelines == 5);
263 TEST(Bootstrap, ExistingSocket) {
265 auto factory = std::make_shared<TestPipelineFactory>();
266 server.childPipeline(factory);
267 folly::AsyncServerSocket::UniquePtr socket(new AsyncServerSocket);
268 server.bind(std::move(socket));
271 std::atomic<int> connections{0};
273 class TestHandlerPipeline : public InboundHandler<void*> {
275 void read(Context* ctx, void* conn) {
277 return ctx->fireRead(conn);
281 template <typename HandlerPipeline>
282 class TestHandlerPipelineFactory
283 : public PipelineFactory<ServerBootstrap<BytesPipeline>::AcceptPipeline> {
285 std::unique_ptr<ServerBootstrap<BytesPipeline>::AcceptPipeline,
286 folly::DelayedDestruction::Destructor>
287 newPipeline(std::shared_ptr<AsyncSocket>) {
289 std::unique_ptr<ServerBootstrap<BytesPipeline>::AcceptPipeline,
290 folly::DelayedDestruction::Destructor> pipeline(
291 new ServerBootstrap<BytesPipeline>::AcceptPipeline);
292 pipeline->addBack(HandlerPipeline());
297 TEST(Bootstrap, LoadBalanceHandler) {
299 auto factory = std::make_shared<TestPipelineFactory>();
300 server.childPipeline(factory);
302 auto pipelinefactory =
303 std::make_shared<TestHandlerPipelineFactory<TestHandlerPipeline>>();
304 server.pipeline(pipelinefactory);
306 auto base = EventBaseManager::get()->getEventBase();
308 SocketAddress address;
309 server.getSockets()[0]->getAddress(&address);
312 client.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
313 client.connect(address);
317 CHECK(factory->pipelines == 1);
318 CHECK(connections == 1);
321 class TestUDPPipeline : public InboundHandler<void*> {
323 void read(Context* ctx, void* conn) {
328 TEST(Bootstrap, UDP) {
330 auto factory = std::make_shared<TestPipelineFactory>();
331 auto pipelinefactory =
332 std::make_shared<TestHandlerPipelineFactory<TestUDPPipeline>>();
333 server.pipeline(pipelinefactory);
334 server.channelFactory(std::make_shared<AsyncUDPServerSocketFactory>());
338 TEST(Bootstrap, UDPClientServerTest) {
342 auto factory = std::make_shared<TestPipelineFactory>();
343 auto pipelinefactory =
344 std::make_shared<TestHandlerPipelineFactory<TestUDPPipeline>>();
345 server.pipeline(pipelinefactory);
346 server.channelFactory(std::make_shared<AsyncUDPServerSocketFactory>());
349 auto base = EventBaseManager::get()->getEventBase();
351 SocketAddress address;
352 server.getSockets()[0]->getAddress(&address);
354 SocketAddress localhost("::1", 0);
355 AsyncUDPSocket client(base);
356 client.bind(localhost);
357 auto data = IOBuf::create(1);
359 *(data->writableData()) = 'a';
360 client.write(address, std::move(data));
364 CHECK(connections == 1);