2 * Copyright 2015 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #include "folly/wangle/bootstrap/ServerBootstrap.h"
18 #include "folly/wangle/bootstrap/ClientBootstrap.h"
19 #include "folly/wangle/channel/Handler.h"
21 #include <glog/logging.h>
22 #include <gtest/gtest.h>
23 #include <boost/thread.hpp>
25 using namespace folly::wangle;
26 using namespace folly;
28 typedef Pipeline<IOBufQueue&, std::unique_ptr<IOBuf>> BytesPipeline;
30 typedef ServerBootstrap<BytesPipeline> TestServer;
31 typedef ClientBootstrap<BytesPipeline> TestClient;
33 class TestClientPipelineFactory : public PipelineFactory<BytesPipeline> {
35 std::unique_ptr<BytesPipeline, folly::DelayedDestruction::Destructor>
36 newPipeline(std::shared_ptr<AsyncSocket> sock) {
39 // We probably aren't connected immedately, check after a small delay
40 EventBaseManager::get()->getEventBase()->tryRunAfterDelay([sock](){
41 CHECK(sock->readable());
47 class TestPipelineFactory : public PipelineFactory<BytesPipeline> {
49 std::unique_ptr<BytesPipeline, folly::DelayedDestruction::Destructor> newPipeline(
50 std::shared_ptr<AsyncSocket> sock) {
53 return std::unique_ptr<BytesPipeline, folly::DelayedDestruction::Destructor>(
56 std::atomic<int> pipelines{0};
59 class TestAcceptor : public Acceptor {
62 TestAcceptor() : Acceptor(ServerSocketConfig()) {
63 Acceptor::init(nullptr, &base_);
66 AsyncSocket::UniquePtr sock,
67 const folly::SocketAddress* address,
68 const std::string& nextProtocolName,
69 const TransportInfo& tinfo) {
73 class TestAcceptorFactory : public AcceptorFactory {
75 std::shared_ptr<Acceptor> newAcceptor(EventBase* base) {
76 return std::make_shared<TestAcceptor>();
80 TEST(Bootstrap, Basic) {
85 TEST(Bootstrap, ServerWithPipeline) {
87 server.childPipeline(std::make_shared<TestPipelineFactory>());
92 TEST(Bootstrap, ServerWithChildHandler) {
94 server.childHandler(std::make_shared<TestAcceptorFactory>());
99 TEST(Bootstrap, ClientServerTest) {
101 auto factory = std::make_shared<TestPipelineFactory>();
102 server.childPipeline(factory);
104 auto base = EventBaseManager::get()->getEventBase();
106 SocketAddress address;
107 server.getSockets()[0]->getAddress(&address);
110 client.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
111 client.connect(address);
115 CHECK(factory->pipelines == 1);
118 TEST(Bootstrap, ClientConnectionManagerTest) {
119 // Create a single IO thread, and verify that
120 // client connections are pooled properly
123 auto factory = std::make_shared<TestPipelineFactory>();
124 server.childPipeline(factory);
125 server.group(std::make_shared<IOThreadPoolExecutor>(1));
127 auto base = EventBaseManager::get()->getEventBase();
129 SocketAddress address;
130 server.getSockets()[0]->getAddress(&address);
133 client.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
135 client.connect(address);
138 client2.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
139 client2.connect(address);
144 CHECK(factory->pipelines == 2);
147 TEST(Bootstrap, ServerAcceptGroupTest) {
148 // Verify that server is using the accept IO group
151 auto factory = std::make_shared<TestPipelineFactory>();
152 server.childPipeline(factory);
153 server.group(std::make_shared<IOThreadPoolExecutor>(1), nullptr);
156 SocketAddress address;
157 server.getSockets()[0]->getAddress(&address);
159 boost::barrier barrier(2);
160 auto thread = std::thread([&](){
162 client.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
163 client.connect(address);
164 EventBaseManager::get()->getEventBase()->loop();
171 CHECK(factory->pipelines == 1);
174 TEST(Bootstrap, ServerAcceptGroup2Test) {
175 // Verify that server is using the accept IO group
177 // Check if reuse port is supported, if not, don't run this test
180 auto serverSocket = AsyncServerSocket::newSocket(&base);
181 serverSocket->bind(0);
182 serverSocket->listen(0);
183 serverSocket->startAccepting();
184 serverSocket->setReusePortEnabled(true);
185 serverSocket->stopAccepting();
187 LOG(INFO) << "Reuse port probably not supported";
192 auto factory = std::make_shared<TestPipelineFactory>();
193 server.childPipeline(factory);
194 server.group(std::make_shared<IOThreadPoolExecutor>(4), nullptr);
197 SocketAddress address;
198 server.getSockets()[0]->getAddress(&address);
201 client.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
203 client.connect(address);
204 EventBaseManager::get()->getEventBase()->loop();
208 CHECK(factory->pipelines == 1);
211 TEST(Bootstrap, SharedThreadPool) {
212 // Check if reuse port is supported, if not, don't run this test
215 auto serverSocket = AsyncServerSocket::newSocket(&base);
216 serverSocket->bind(0);
217 serverSocket->listen(0);
218 serverSocket->startAccepting();
219 serverSocket->setReusePortEnabled(true);
220 serverSocket->stopAccepting();
222 LOG(INFO) << "Reuse port probably not supported";
226 auto pool = std::make_shared<IOThreadPoolExecutor>(2);
229 auto factory = std::make_shared<TestPipelineFactory>();
230 server.childPipeline(factory);
231 server.group(pool, pool);
235 SocketAddress address;
236 server.getSockets()[0]->getAddress(&address);
239 client.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
240 client.connect(address);
243 client2.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
244 client2.connect(address);
247 client3.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
248 client3.connect(address);
251 client4.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
252 client4.connect(address);
255 client5.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
256 client5.connect(address);
258 EventBaseManager::get()->getEventBase()->loop();
261 CHECK(factory->pipelines == 5);
264 TEST(Bootstrap, ExistingSocket) {
266 auto factory = std::make_shared<TestPipelineFactory>();
267 server.childPipeline(factory);
268 folly::AsyncServerSocket::UniquePtr socket(new AsyncServerSocket);
269 server.bind(std::move(socket));
272 std::atomic<int> connections{0};
274 class TestHandlerPipeline : public InboundHandler<void*> {
276 void read(Context* ctx, void* conn) {
278 return ctx->fireRead(conn);
282 template <typename HandlerPipeline>
283 class TestHandlerPipelineFactory
284 : public PipelineFactory<ServerBootstrap<BytesPipeline>::AcceptPipeline> {
286 std::unique_ptr<ServerBootstrap<BytesPipeline>::AcceptPipeline,
287 folly::DelayedDestruction::Destructor>
288 newPipeline(std::shared_ptr<AsyncSocket>) {
290 std::unique_ptr<ServerBootstrap<BytesPipeline>::AcceptPipeline,
291 folly::DelayedDestruction::Destructor> pipeline(
292 new ServerBootstrap<BytesPipeline>::AcceptPipeline);
293 pipeline->addBack(HandlerPipeline());
298 TEST(Bootstrap, LoadBalanceHandler) {
300 auto factory = std::make_shared<TestPipelineFactory>();
301 server.childPipeline(factory);
303 auto pipelinefactory =
304 std::make_shared<TestHandlerPipelineFactory<TestHandlerPipeline>>();
305 server.pipeline(pipelinefactory);
307 auto base = EventBaseManager::get()->getEventBase();
309 SocketAddress address;
310 server.getSockets()[0]->getAddress(&address);
313 client.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
314 client.connect(address);
318 CHECK(factory->pipelines == 1);
319 CHECK(connections == 1);
322 class TestUDPPipeline : public InboundHandler<void*> {
324 void read(Context* ctx, void* conn) {
329 TEST(Bootstrap, UDP) {
331 auto factory = std::make_shared<TestPipelineFactory>();
332 auto pipelinefactory =
333 std::make_shared<TestHandlerPipelineFactory<TestUDPPipeline>>();
334 server.pipeline(pipelinefactory);
335 server.channelFactory(std::make_shared<AsyncUDPServerSocketFactory>());
339 TEST(Bootstrap, UDPClientServerTest) {
343 auto factory = std::make_shared<TestPipelineFactory>();
344 auto pipelinefactory =
345 std::make_shared<TestHandlerPipelineFactory<TestUDPPipeline>>();
346 server.pipeline(pipelinefactory);
347 server.channelFactory(std::make_shared<AsyncUDPServerSocketFactory>());
350 auto base = EventBaseManager::get()->getEventBase();
352 SocketAddress address;
353 server.getSockets()[0]->getAddress(&address);
355 SocketAddress localhost("::1", 0);
356 AsyncUDPSocket client(base);
357 client.bind(localhost);
358 auto data = IOBuf::create(1);
360 *(data->writableData()) = 'a';
361 client.write(address, std::move(data));
365 CHECK(connections == 1);