2 * Copyright 2015 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #include "folly/wangle/bootstrap/ServerBootstrap.h"
18 #include "folly/wangle/bootstrap/ClientBootstrap.h"
19 #include "folly/wangle/channel/Handler.h"
21 #include <glog/logging.h>
22 #include <gtest/gtest.h>
23 #include <boost/thread.hpp>
25 using namespace folly::wangle;
26 using namespace folly;
28 typedef Pipeline<IOBufQueue&, std::unique_ptr<IOBuf>> BytesPipeline;
30 typedef ServerBootstrap<BytesPipeline> TestServer;
31 typedef ClientBootstrap<BytesPipeline> TestClient;
33 class TestClientPipelineFactory : public PipelineFactory<BytesPipeline> {
35 BytesPipeline* newPipeline(std::shared_ptr<AsyncSocket> sock) {
38 // We probably aren't connected immedately, check after a small delay
39 EventBaseManager::get()->getEventBase()->tryRunAfterDelay([sock](){
40 CHECK(sock->readable());
46 class TestPipelineFactory : public PipelineFactory<BytesPipeline> {
48 BytesPipeline* newPipeline(std::shared_ptr<AsyncSocket> sock) {
50 return new BytesPipeline();
52 std::atomic<int> pipelines{0};
55 class TestAcceptor : public Acceptor {
58 TestAcceptor() : Acceptor(ServerSocketConfig()) {
59 Acceptor::init(nullptr, &base_);
62 AsyncSocket::UniquePtr sock,
63 const folly::SocketAddress* address,
64 const std::string& nextProtocolName,
65 const TransportInfo& tinfo) {
69 class TestAcceptorFactory : public AcceptorFactory {
71 std::shared_ptr<Acceptor> newAcceptor(EventBase* base) {
72 return std::make_shared<TestAcceptor>();
76 TEST(Bootstrap, Basic) {
81 TEST(Bootstrap, ServerWithPipeline) {
83 server.childPipeline(std::make_shared<TestPipelineFactory>());
88 TEST(Bootstrap, ServerWithChildHandler) {
90 server.childHandler(std::make_shared<TestAcceptorFactory>());
95 TEST(Bootstrap, ClientServerTest) {
97 auto factory = std::make_shared<TestPipelineFactory>();
98 server.childPipeline(factory);
100 auto base = EventBaseManager::get()->getEventBase();
102 SocketAddress address;
103 server.getSockets()[0]->getAddress(&address);
106 client.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
107 client.connect(address);
111 CHECK(factory->pipelines == 1);
114 TEST(Bootstrap, ClientConnectionManagerTest) {
115 // Create a single IO thread, and verify that
116 // client connections are pooled properly
119 auto factory = std::make_shared<TestPipelineFactory>();
120 server.childPipeline(factory);
121 server.group(std::make_shared<IOThreadPoolExecutor>(1));
123 auto base = EventBaseManager::get()->getEventBase();
125 SocketAddress address;
126 server.getSockets()[0]->getAddress(&address);
129 client.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
131 client.connect(address);
134 client2.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
135 client2.connect(address);
140 CHECK(factory->pipelines == 2);
143 TEST(Bootstrap, ServerAcceptGroupTest) {
144 // Verify that server is using the accept IO group
147 auto factory = std::make_shared<TestPipelineFactory>();
148 server.childPipeline(factory);
149 server.group(std::make_shared<IOThreadPoolExecutor>(1), nullptr);
152 SocketAddress address;
153 server.getSockets()[0]->getAddress(&address);
155 boost::barrier barrier(2);
156 auto thread = std::thread([&](){
158 client.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
159 client.connect(address);
160 EventBaseManager::get()->getEventBase()->loop();
167 CHECK(factory->pipelines == 1);
170 TEST(Bootstrap, ServerAcceptGroup2Test) {
171 // Verify that server is using the accept IO group
173 // Check if reuse port is supported, if not, don't run this test
176 auto serverSocket = AsyncServerSocket::newSocket(&base);
177 serverSocket->bind(0);
178 serverSocket->listen(0);
179 serverSocket->startAccepting();
180 serverSocket->setReusePortEnabled(true);
181 serverSocket->stopAccepting();
183 LOG(INFO) << "Reuse port probably not supported";
188 auto factory = std::make_shared<TestPipelineFactory>();
189 server.childPipeline(factory);
190 server.group(std::make_shared<IOThreadPoolExecutor>(4), nullptr);
193 SocketAddress address;
194 server.getSockets()[0]->getAddress(&address);
197 client.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
199 client.connect(address);
200 EventBaseManager::get()->getEventBase()->loop();
204 CHECK(factory->pipelines == 1);
207 TEST(Bootstrap, SharedThreadPool) {
208 // Check if reuse port is supported, if not, don't run this test
211 auto serverSocket = AsyncServerSocket::newSocket(&base);
212 serverSocket->bind(0);
213 serverSocket->listen(0);
214 serverSocket->startAccepting();
215 serverSocket->setReusePortEnabled(true);
216 serverSocket->stopAccepting();
218 LOG(INFO) << "Reuse port probably not supported";
222 auto pool = std::make_shared<IOThreadPoolExecutor>(2);
225 auto factory = std::make_shared<TestPipelineFactory>();
226 server.childPipeline(factory);
227 server.group(pool, pool);
231 SocketAddress address;
232 server.getSockets()[0]->getAddress(&address);
235 client.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
236 client.connect(address);
239 client2.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
240 client2.connect(address);
243 client3.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
244 client3.connect(address);
247 client4.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
248 client4.connect(address);
251 client5.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
252 client5.connect(address);
254 EventBaseManager::get()->getEventBase()->loop();
257 CHECK(factory->pipelines == 5);
260 TEST(Bootstrap, ExistingSocket) {
262 auto factory = std::make_shared<TestPipelineFactory>();
263 server.childPipeline(factory);
264 folly::AsyncServerSocket::UniquePtr socket(new AsyncServerSocket);
265 server.bind(std::move(socket));
268 std::atomic<int> connections{0};
270 class TestHandlerPipeline : public InboundHandler<void*> {
272 void read(Context* ctx, void* conn) {
274 return ctx->fireRead(conn);
278 template <typename HandlerPipeline>
279 class TestHandlerPipelineFactory
280 : public PipelineFactory<ServerBootstrap<BytesPipeline>::AcceptPipeline> {
282 ServerBootstrap<BytesPipeline>::AcceptPipeline* newPipeline(std::shared_ptr<AsyncSocket>) {
283 auto pipeline = new ServerBootstrap<BytesPipeline>::AcceptPipeline;
284 pipeline->addBack(HandlerPipeline());
289 TEST(Bootstrap, LoadBalanceHandler) {
291 auto factory = std::make_shared<TestPipelineFactory>();
292 server.childPipeline(factory);
294 auto pipelinefactory =
295 std::make_shared<TestHandlerPipelineFactory<TestHandlerPipeline>>();
296 server.pipeline(pipelinefactory);
298 auto base = EventBaseManager::get()->getEventBase();
300 SocketAddress address;
301 server.getSockets()[0]->getAddress(&address);
304 client.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
305 client.connect(address);
309 CHECK(factory->pipelines == 1);
310 CHECK(connections == 1);
313 class TestUDPPipeline : public InboundHandler<void*> {
315 void read(Context* ctx, void* conn) {
320 TEST(Bootstrap, UDP) {
322 auto factory = std::make_shared<TestPipelineFactory>();
323 auto pipelinefactory =
324 std::make_shared<TestHandlerPipelineFactory<TestUDPPipeline>>();
325 server.pipeline(pipelinefactory);
326 server.channelFactory(std::make_shared<AsyncUDPServerSocketFactory>());
330 TEST(Bootstrap, UDPClientServerTest) {
334 auto factory = std::make_shared<TestPipelineFactory>();
335 auto pipelinefactory =
336 std::make_shared<TestHandlerPipelineFactory<TestUDPPipeline>>();
337 server.pipeline(pipelinefactory);
338 server.channelFactory(std::make_shared<AsyncUDPServerSocketFactory>());
341 auto base = EventBaseManager::get()->getEventBase();
343 SocketAddress address;
344 server.getSockets()[0]->getAddress(&address);
346 SocketAddress localhost("::1", 0);
347 AsyncUDPSocket client(base);
348 client.bind(localhost);
349 auto data = IOBuf::create(1);
351 *(data->writableData()) = 'a';
352 client.write(address, std::move(data));
356 CHECK(connections == 1);