Multi accept
authordavejwatson <davejwatson@fb.com>
Wed, 10 Dec 2014 15:58:27 +0000 (07:58 -0800)
committerDave Watson <davejwatson@fb.com>
Thu, 11 Dec 2014 16:02:00 +0000 (08:02 -0800)
Summary: Use acceptor pool instead of current thread for accepts.  Use reuse_port option to support multiple threads

Test Plan: Updated unittests

Reviewed By: hans@fb.com

Subscribers: doug, fugalh, njormrod, folly-diffs@

FB internal diff: D1710619

Tasks: 5788110

Signature: t1:1710619:1417477350:eee5063186e582ef74c4802b8149563af029b3de

folly/experimental/wangle/bootstrap/BootstrapTest.cpp
folly/experimental/wangle/bootstrap/ServerBootstrap.cpp
folly/experimental/wangle/bootstrap/ServerBootstrap.h

index 0d0d843fbb18f744b5eb095b4adcdc708331c6a8..b3cf00da0e8f96a88196412824f1ad0c6361b7f4 100644 (file)
@@ -62,11 +62,7 @@ TEST(Bootstrap, ServerWithPipeline) {
   TestServer server;
   server.childPipeline(std::make_shared<TestPipelineFactory>());
   server.bind(0);
-  auto base = EventBaseManager::get()->getEventBase();
-  base->runAfterDelay([&](){
-    server.stop();
-  }, 500);
-  base->loop();
+  server.stop();
 }
 
 TEST(Bootstrap, ClientServerTest) {
@@ -81,10 +77,8 @@ TEST(Bootstrap, ClientServerTest) {
 
   TestClient client;
   client.connect(address);
-  base->runAfterDelay([&](){
-    server.stop();
-  }, 500);
   base->loop();
+  server.stop();
 
   CHECK(factory->pipelines == 1);
 }
@@ -109,11 +103,55 @@ TEST(Bootstrap, ClientConnectionManagerTest) {
   TestClient client2;
   client2.connect(address);
 
-  base->runAfterDelay([&](){
-    server.stop();
-  }, 500);
-
   base->loop();
+  server.stop();
 
   CHECK(factory->pipelines == 2);
 }
+
+TEST(Bootstrap, ServerAcceptGroupTest) {
+  // Verify that server is using the accept IO group
+
+  TestServer server;
+  auto factory = std::make_shared<TestPipelineFactory>();
+  server.childPipeline(factory);
+  server.group(std::make_shared<IOThreadPoolExecutor>(1), nullptr);
+  server.bind(0);
+
+  SocketAddress address;
+  server.getSockets()[0]->getAddress(&address);
+
+  boost::barrier barrier(2);
+  auto thread = std::thread([&](){
+    TestClient client;
+    client.connect(address);
+    EventBaseManager::get()->getEventBase()->loop();
+    barrier.wait();
+  });
+  barrier.wait();
+  server.stop();
+  thread.join();
+
+  CHECK(factory->pipelines == 1);
+}
+
+TEST(Bootstrap, ServerAcceptGroup2Test) {
+  // Verify that server is using the accept IO group
+
+  TestServer server;
+  auto factory = std::make_shared<TestPipelineFactory>();
+  server.childPipeline(factory);
+  server.group(std::make_shared<IOThreadPoolExecutor>(4), nullptr);
+  server.bind(0);
+
+  SocketAddress address;
+  server.getSockets()[0]->getAddress(&address);
+
+  TestClient client;
+  client.connect(address);
+  EventBaseManager::get()->getEventBase()->loop();
+
+  server.stop();
+
+  CHECK(factory->pipelines == 1);
+}
index 17e4bb8659a87561733b5c426c45485da1cea6fc..594e960467e6ef57781462f3d7652e26e288b3e6 100644 (file)
@@ -21,13 +21,13 @@ namespace folly {
 
 std::thread ServerWorkerFactory::newThread(
     folly::wangle::Func&& func) {
+  auto id = nextWorkerId_++;
+  auto worker = acceptorFactory_->newAcceptor();
+  {
+    folly::RWSpinLock::WriteHolder guard(workersLock_);
+    workers_.insert({id, worker});
+  }
   return internalFactory_->newThread([=](){
-    auto id = nextWorkerId_++;
-    auto worker = acceptorFactory_->newAcceptor();
-    {
-      folly::RWSpinLock::WriteHolder guard(workersLock_);
-      workers_.insert({id, worker});
-    }
     EventBaseManager::get()->setEventBase(worker->getEventBase(), false);
     func();
     EventBaseManager::get()->clearEventBase();
index 4d55c4e36b5f6ed6a216399fa5fc1b9ff7d9dab0..f7389d98d2c1928da26c426697819e7463c3c792 100644 (file)
@@ -16,6 +16,7 @@
 #pragma once
 
 #include <folly/experimental/wangle/bootstrap/ServerBootstrap-inl.h>
+#include <boost/thread.hpp>
 
 namespace folly {
 
@@ -137,37 +138,80 @@ class ServerBootstrap {
    * @param port Port to listen on
    */
   void bind(int port) {
-    // TODO bind to v4 and v6
     // TODO take existing socket
-    // TODO use the acceptor thread
-    auto socket = folly::AsyncServerSocket::newSocket(
-      EventBaseManager::get()->getEventBase());
-    sockets_.push_back(socket);
-    socket->bind(port);
-
-    // TODO Take ServerSocketConfig
-    socket->listen(1024);
 
     if (!workerFactory_) {
       group(nullptr);
     }
 
+    bool reusePort = false;
+    if (acceptor_group_->numThreads() >= 0) {
+      reusePort = true;
+    }
+
+    std::mutex sock_lock;
+    std::vector<std::shared_ptr<folly::AsyncServerSocket>> new_sockets;
+
+    auto startupFunc = [&](std::shared_ptr<boost::barrier> barrier){
+        auto socket = folly::AsyncServerSocket::newSocket();
+        sock_lock.lock();
+        new_sockets.push_back(socket);
+        sock_lock.unlock();
+        socket->setReusePortEnabled(reusePort);
+        socket->attachEventBase(EventBaseManager::get()->getEventBase());
+        socket->bind(port);
+        // TODO Take ServerSocketConfig
+        socket->listen(1024);
+        socket->startAccepting();
+
+        if (port == 0) {
+          SocketAddress address;
+          socket->getAddress(&address);
+          port = address.getPort();
+        }
+
+        barrier->wait();
+    };
+
+    auto bind0 = std::make_shared<boost::barrier>(2);
+    acceptor_group_->add(std::bind(startupFunc, bind0));
+    bind0->wait();
+
+    auto barrier = std::make_shared<boost::barrier>(acceptor_group_->numThreads());
+    for (int i = 1; i < acceptor_group_->numThreads(); i++) {
+      acceptor_group_->add(std::bind(startupFunc, barrier));
+    }
+    barrier->wait();
+
     // Startup all the threads
-    workerFactory_->forEachWorker([this, socket](Acceptor* worker){
+    for(auto socket : new_sockets) {
+      workerFactory_->forEachWorker([this, socket](Acceptor* worker){
         socket->getEventBase()->runInEventBaseThread([this, worker, socket](){
           socket->addAcceptCallback(worker, worker->getEventBase());
         });
-    });
-    socket->startAccepting();
+      });
+    }
+
+    for (auto& socket : new_sockets) {
+      sockets_.push_back(socket);
+    }
   }
 
   /*
    * Stop listening on all sockets.
    */
   void stop() {
-    for (auto& socket : sockets_) {
-      socket->stopAccepting();
+    auto barrier = std::make_shared<boost::barrier>(sockets_.size() + 1);
+    for (auto socket : sockets_) {
+      socket->getEventBase()->runInEventBaseThread([barrier, socket]() {
+        socket->stopAccepting();
+        socket->detachEventBase();
+        barrier->wait();
+      });
     }
+    barrier->wait();
+    sockets_.clear();
+
     acceptor_group_->join();
     io_group_->join();
   }