use ServerBootstrap
authorDave Watson <davejwatson@fb.com>
Thu, 19 Feb 2015 18:06:27 +0000 (10:06 -0800)
committerAlecs King <int@fb.com>
Tue, 3 Mar 2015 03:23:34 +0000 (19:23 -0800)
Summary:
Use server bootstrap.

* code reuse
* Allows multiple accept threads easily.

Accept now doesn't happen in the server's eventBase, but a new thread.

I think I hit all the corner cases involving existingSocket, duplex, stopListening(), etc.

There are a lot of minor changes here to support all the cases, let me know if I should split anything to separate diffs

Test Plan: fbconfig -r thrift/lib/cpp2; fbmake runtests

Reviewed By: hans@fb.com

Subscribers: haijunz, yfeldblum, ruibalp, fbcode-common-diffs@, davejwatson, jsedgwick, trunkagent, doug, alandau, bmatheny, mshneer, folly-diffs@, mcduff, andrewcox, fugalh

FB internal diff: D1732895

Tasks: 5788102

Signature: t1:1732895:1423087631:379bbb131c35ce2221801bc7cec226f87ba0b1d9

folly/Makefile.am
folly/wangle/acceptor/Acceptor.h
folly/wangle/bootstrap/BootstrapTest.cpp
folly/wangle/bootstrap/ServerBootstrap-inl.h
folly/wangle/bootstrap/ServerBootstrap.cpp
folly/wangle/bootstrap/ServerBootstrap.h

index a8d548281bc8d19b6b516a6c78c23e48b8600b3b..b273f71de449a66ef8cf0f954dded1bead13b421 100644 (file)
@@ -225,6 +225,9 @@ nobase_follyinclude_HEADERS = \
        wangle/acceptor/ServerSocketConfig.h \
        wangle/acceptor/SocketOptions.h \
        wangle/acceptor/TransportInfo.h \
+       wangle/bootstrap/ServerBootstrap.h \
+       wangle/bootstrap/ServerBootstrap-inl.h \
+       wangle/bootstrap/ClientBootstrap.h \
        wangle/channel/AsyncSocketHandler.h \
        wangle/channel/ChannelHandler.h \
        wangle/channel/ChannelHandlerContext.h \
@@ -343,6 +346,7 @@ libfolly_la_SOURCES = \
        wangle/acceptor/ManagedConnection.cpp \
        wangle/acceptor/SocketOptions.cpp \
        wangle/acceptor/TransportInfo.cpp \
+       wangle/bootstrap/ServerBootstrap.cpp \
        wangle/concurrent/CPUThreadPoolExecutor.cpp \
        wangle/concurrent/Codel.cpp \
        wangle/concurrent/IOThreadPoolExecutor.cpp \
index 22c467197ac68fb1ffe514938bffcfad8328c556..ee73b4715e1e1fd88b5bb1fc1ca8c4aec5ee05d8 100644 (file)
@@ -180,6 +180,14 @@ class Acceptor :
    */
   void drainAllConnections();
 
+  /**
+   * Drop all connections.
+   *
+   * forceStop() schedules dropAllConnections() to be called in the acceptor's
+   * thread.
+   */
+  void dropAllConnections();
+
  protected:
   friend class AcceptorHandshakeHelper;
 
@@ -238,14 +246,6 @@ class Acceptor :
       std::chrono::milliseconds acceptLatency,
       SSLErrorEnum error) noexcept {}
 
-  /**
-   * Drop all connections.
-   *
-   * forceStop() schedules dropAllConnections() to be called in the acceptor's
-   * thread.
-   */
-  void dropAllConnections();
-
  protected:
 
   /**
@@ -340,7 +340,7 @@ class Acceptor :
 
 class AcceptorFactory {
  public:
-  virtual std::shared_ptr<Acceptor> newAcceptor() = 0;
+  virtual std::shared_ptr<Acceptor> newAcceptor(folly::EventBase*) = 0;
   virtual ~AcceptorFactory() = default;
 };
 
index 9f2f664e771c761890a72fd0087d60b874e1d3d4..4bbd80c22f22a5442be0e84377e1ebe1f76598b8 100644 (file)
@@ -20,6 +20,7 @@
 
 #include <glog/logging.h>
 #include <gtest/gtest.h>
+#include <boost/thread.hpp>
 
 using namespace folly::wangle;
 using namespace folly;
@@ -227,3 +228,11 @@ TEST(Bootstrap, SharedThreadPool) {
   server.stop();
   CHECK(factory->pipelines == 5);
 }
+
+TEST(Bootstrap, ExistingSocket) {
+  TestServer server;
+  auto factory = std::make_shared<TestPipelineFactory>();
+  server.childPipeline(factory);
+  folly::AsyncServerSocket::UniquePtr socket(new AsyncServerSocket);
+  server.bind(std::move(socket));
+}
index a224a9f915ff25e5cf0f9871412338b602208f80..acaa29be6ad6e307db2b8fd080822286cdea3481 100644 (file)
@@ -53,10 +53,11 @@ class ServerAcceptor : public Acceptor {
 
  public:
   explicit ServerAcceptor(
-    std::shared_ptr<PipelineFactory<Pipeline>> pipelineFactory)
+    std::shared_ptr<PipelineFactory<Pipeline>> pipelineFactory,
+    EventBase* base)
       : Acceptor(ServerSocketConfig())
       , pipelineFactory_(pipelineFactory) {
-    Acceptor::init(nullptr, &base_);
+    Acceptor::init(nullptr, base);
   }
 
   /* See Acceptor::onNewConnection for details */
@@ -74,13 +75,7 @@ class ServerAcceptor : public Acceptor {
     Acceptor::addConnection(connection);
   }
 
-  ~ServerAcceptor() {
-    Acceptor::dropAllConnections();
-  }
-
  private:
-  EventBase base_;
-
   std::shared_ptr<PipelineFactory<Pipeline>> pipelineFactory_;
 };
 
@@ -91,41 +86,51 @@ class ServerAcceptorFactory : public AcceptorFactory {
       std::shared_ptr<PipelineFactory<Pipeline>> factory)
     : factory_(factory) {}
 
-  std::shared_ptr<Acceptor> newAcceptor() {
-    return std::make_shared<ServerAcceptor<Pipeline>>(factory_);
+  std::shared_ptr<Acceptor> newAcceptor(folly::EventBase* base) {
+    return std::make_shared<ServerAcceptor<Pipeline>>(factory_, base);
   }
  private:
   std::shared_ptr<PipelineFactory<Pipeline>> factory_;
 };
 
-class ServerWorkerFactory : public folly::wangle::ThreadFactory {
+class ServerWorkerPool : public folly::wangle::ThreadPoolExecutor::Observer {
  public:
-  explicit ServerWorkerFactory(std::shared_ptr<AcceptorFactory> acceptorFactory)
-      : internalFactory_(
-        std::make_shared<folly::wangle::NamedThreadFactory>("BootstrapWorker"))
-      , acceptorFactory_(acceptorFactory)
-    {}
-  virtual std::thread newThread(folly::Func&& func) override;
-
-  void setInternalFactory(
-    std::shared_ptr<folly::wangle::NamedThreadFactory> internalFactory);
-  void setNamePrefix(folly::StringPiece prefix);
+  explicit ServerWorkerPool(
+    std::shared_ptr<AcceptorFactory> acceptorFactory,
+    folly::wangle::IOThreadPoolExecutor* exec,
+    std::vector<std::shared_ptr<folly::AsyncServerSocket>>* sockets)
+      : acceptorFactory_(acceptorFactory)
+      , exec_(exec)
+      , sockets_(sockets) {
+    CHECK(exec);
+  }
 
   template <typename F>
-  void forEachWorker(F&& f);
+  void forEachWorker(F&& f) const;
+
+  void threadStarted(
+    folly::wangle::ThreadPoolExecutor::ThreadHandle*);
+  void threadStopped(
+    folly::wangle::ThreadPoolExecutor::ThreadHandle*);
+  void threadPreviouslyStarted(
+      folly::wangle::ThreadPoolExecutor::ThreadHandle* thread) {
+    threadStarted(thread);
+  }
+  void threadNotYetStopped(
+      folly::wangle::ThreadPoolExecutor::ThreadHandle* thread) {
+    threadStopped(thread);
+  }
 
  private:
-  std::shared_ptr<folly::wangle::NamedThreadFactory> internalFactory_;
-  folly::RWSpinLock workersLock_;
-  std::map<int32_t, std::shared_ptr<Acceptor>> workers_;
-  int32_t nextWorkerId_{0};
-
+  std::map<folly::wangle::ThreadPoolExecutor::ThreadHandle*,
+           std::shared_ptr<Acceptor>> workers_;
   std::shared_ptr<AcceptorFactory> acceptorFactory_;
+  folly::wangle::IOThreadPoolExecutor* exec_{nullptr};
+  std::vector<std::shared_ptr<folly::AsyncServerSocket>>* sockets_;
 };
 
 template <typename F>
-void ServerWorkerFactory::forEachWorker(F&& f) {
-  folly::RWSpinLock::ReadHolder guard(workersLock_);
+void ServerWorkerPool::forEachWorker(F&& f) const {
   for (const auto& kv : workers_) {
     f(kv.second.get());
   }
index 1f07fadf7e6fa2995520b9b055fc16019cf23b7a..dc7d5d1e6d3eed7f568fc2496bb85c3fa5f050d7 100644 (file)
 
 namespace folly {
 
-std::thread ServerWorkerFactory::newThread(
-    folly::Func&& func) {
-  auto id = nextWorkerId_++;
-  auto worker = acceptorFactory_->newAcceptor();
-  {
-    folly::RWSpinLock::WriteHolder guard(workersLock_);
-    workers_.insert({id, worker});
+void ServerWorkerPool::threadStarted(
+  folly::wangle::ThreadPoolExecutor::ThreadHandle* h) {
+  auto worker = acceptorFactory_->newAcceptor(exec_->getEventBase(h));
+  workers_.insert({h, worker});
+
+  for(auto socket : *sockets_) {
+    socket->getEventBase()->runInEventBaseThread([this, worker, socket](){
+      socket->addAcceptCallback(worker.get(), worker->getEventBase());
+    });
   }
-  return internalFactory_->newThread([=](){
-    EventBaseManager::get()->setEventBase(worker->getEventBase(), false);
-    func();
-    EventBaseManager::get()->clearEventBase();
-
-    worker->drainAllConnections();
-    {
-      folly::RWSpinLock::WriteHolder guard(workersLock_);
-      workers_.erase(id);
-    }
-  });
 }
 
-void ServerWorkerFactory::setInternalFactory(
-  std::shared_ptr<wangle::NamedThreadFactory> internalFactory) {
-  CHECK(workers_.empty());
-  internalFactory_ = internalFactory;
-}
+void ServerWorkerPool::threadStopped(
+  folly::wangle::ThreadPoolExecutor::ThreadHandle* h) {
+  auto worker = workers_.find(h);
+  CHECK(worker != workers_.end());
+
+  for (auto& socket : *sockets_) {
+    folly::Baton<> barrier;
+    socket->getEventBase()->runInEventBaseThread([&]() {
+      socket->removeAcceptCallback(worker->second.get(), nullptr);
+      barrier.post();
+    });
+    barrier.wait();
+  }
+
+  CHECK(worker->second->getEventBase() != nullptr);
+  CHECK(!worker->second->getEventBase()->isInEventBaseThread());
+  folly::Baton<> barrier;
+  worker->second->getEventBase()->runInEventBaseThread([&]() {
+      worker->second->dropAllConnections();
+      barrier.post();
+  });
 
-void ServerWorkerFactory::setNamePrefix(folly::StringPiece prefix) {
-  CHECK(workers_.empty());
-  internalFactory_->setNamePrefix(prefix);
+  barrier.wait();
+  workers_.erase(worker);
 }
 
 } // namespace
index dd5cbc566deaef9c9a99e8e7a9b2c750233ad63e..142c48713a06cf478de67925970b67fc230b0dad 100644 (file)
@@ -16,7 +16,7 @@
 #pragma once
 
 #include <folly/wangle/bootstrap/ServerBootstrap-inl.h>
-#include <boost/thread.hpp>
+#include <folly/Baton.h>
 
 namespace folly {
 
@@ -37,6 +37,10 @@ namespace folly {
 template <typename Pipeline>
 class ServerBootstrap {
  public:
+
+  ~ServerBootstrap() {
+    stop();
+  }
   /* TODO(davejwatson)
    *
    * If there is any work to be done BEFORE handing the work to IO
@@ -103,34 +107,65 @@ class ServerBootstrap {
       io_group = std::make_shared<folly::wangle::IOThreadPoolExecutor>(
         32, std::make_shared<wangle::NamedThreadFactory>("IO Thread"));
     }
-    auto factoryBase = io_group->getThreadFactory();
-    CHECK(factoryBase);
-    auto factory = std::dynamic_pointer_cast<folly::wangle::NamedThreadFactory>(
-      factoryBase);
-    CHECK(factory); // Must be named thread factory
 
     CHECK(acceptorFactory_ || pipelineFactory_);
 
     if (acceptorFactory_) {
-      workerFactory_ = std::make_shared<ServerWorkerFactory>(
-        acceptorFactory_);
+      workerFactory_ = std::make_shared<ServerWorkerPool>(
+        acceptorFactory_, io_group.get(), &sockets_);
     } else {
-      workerFactory_ = std::make_shared<ServerWorkerFactory>(
-        std::make_shared<ServerAcceptorFactory<Pipeline>>(pipelineFactory_));
+      workerFactory_ = std::make_shared<ServerWorkerPool>(
+        std::make_shared<ServerAcceptorFactory<Pipeline>>(pipelineFactory_),
+        io_group.get(), &sockets_);
     }
-    workerFactory_->setInternalFactory(factory);
+
+    io_group->addObserver(workerFactory_);
 
     acceptor_group_ = accept_group;
     io_group_ = io_group;
 
-    auto numThreads = io_group_->numThreads();
-    io_group_->setNumThreads(0);
-    io_group_->setThreadFactory(workerFactory_);
-    io_group_->setNumThreads(numThreads);
-
     return this;
   }
 
+  /*
+   * Bind to an existing socket
+   *
+   * @param sock Existing socket to use for accepting
+   */
+  void bind(folly::AsyncServerSocket::UniquePtr s) {
+    if (!workerFactory_) {
+      group(nullptr);
+    }
+
+    // Since only a single socket is given,
+    // we can only accept on a single thread
+    CHECK(acceptor_group_->numThreads() == 1);
+    std::shared_ptr<folly::AsyncServerSocket> socket(
+      s.release(), DelayedDestruction::Destructor());
+
+    folly::Baton<> barrier;
+    acceptor_group_->add([&](){
+      socket->attachEventBase(EventBaseManager::get()->getEventBase());
+      socket->listen(1024);
+      socket->startAccepting();
+      barrier.post();
+    });
+    barrier.wait();
+
+    // Startup all the threads
+    workerFactory_->forEachWorker([this, socket](Acceptor* worker){
+      socket->getEventBase()->runInEventBaseThread([this, worker, socket](){
+        socket->addAcceptCallback(worker, worker->getEventBase());
+      });
+    });
+
+    sockets_.push_back(socket);
+  }
+
+  void bind(folly::SocketAddress address) {
+    bindImpl(-1, address);
+  }
+
   /*
    * Bind to a port and start listening.
    * One of childPipeline or childHandler must be called before bind
@@ -138,8 +173,11 @@ class ServerBootstrap {
    * @param port Port to listen on
    */
   void bind(int port) {
-    // TODO take existing socket
+    CHECK(port >= 0);
+    bindImpl(port, folly::SocketAddress());
+  }
 
+  void bindImpl(int port, folly::SocketAddress address) {
     if (!workerFactory_) {
       group(nullptr);
     }
@@ -152,16 +190,20 @@ class ServerBootstrap {
     std::mutex sock_lock;
     std::vector<std::shared_ptr<folly::AsyncServerSocket>> new_sockets;
 
-    auto startupFunc = [&](std::shared_ptr<boost::barrier> barrier){
+    auto startupFunc = [&](std::shared_ptr<folly::Baton<>> barrier){
         auto socket = folly::AsyncServerSocket::newSocket();
         sock_lock.lock();
         new_sockets.push_back(socket);
         sock_lock.unlock();
         socket->setReusePortEnabled(reusePort);
         socket->attachEventBase(EventBaseManager::get()->getEventBase());
-        socket->bind(port);
-        // TODO Take ServerSocketConfig
-        socket->listen(1024);
+        if (port >= 0) {
+          socket->bind(port);
+        } else {
+          socket->bind(address);
+          port = address.getPort();
+        }
+        socket->listen(socketConfig.acceptBacklog);
         socket->startAccepting();
 
         if (port == 0) {
@@ -170,18 +212,18 @@ class ServerBootstrap {
           port = address.getPort();
         }
 
-        barrier->wait();
+        barrier->post();
     };
 
-    auto bind0 = std::make_shared<boost::barrier>(2);
-    acceptor_group_->add(std::bind(startupFunc, bind0));
-    bind0->wait();
+    auto wait0 = std::make_shared<folly::Baton<>>();
+    acceptor_group_->add(std::bind(startupFunc, wait0));
+    wait0->wait();
 
-    auto barrier = std::make_shared<boost::barrier>(acceptor_group_->numThreads());
     for (size_t i = 1; i < acceptor_group_->numThreads(); i++) {
+      auto barrier = std::make_shared<folly::Baton<>>();
       acceptor_group_->add(std::bind(startupFunc, barrier));
+      barrier->wait();
     }
-    barrier->wait();
 
     // Startup all the threads
     for(auto socket : new_sockets) {
@@ -201,34 +243,49 @@ class ServerBootstrap {
    * Stop listening on all sockets.
    */
   void stop() {
-    auto barrier = std::make_shared<boost::barrier>(sockets_.size() + 1);
     for (auto socket : sockets_) {
-      socket->getEventBase()->runInEventBaseThread([barrier, socket]() {
+      folly::Baton<> barrier;
+      socket->getEventBase()->runInEventBaseThread([&barrier, socket]() {
         socket->stopAccepting();
         socket->detachEventBase();
-        barrier->wait();
+        barrier.post();
       });
+      barrier.wait();
     }
-    barrier->wait();
     sockets_.clear();
 
-    acceptor_group_->join();
-    io_group_->join();
+    if (acceptor_group_) {
+      acceptor_group_->join();
+    }
+    if (io_group_) {
+      io_group_->join();
+    }
   }
 
   /*
    * Get the list of listening sockets
    */
-  std::vector<std::shared_ptr<folly::AsyncServerSocket>>&
-  getSockets() {
+  const std::vector<std::shared_ptr<folly::AsyncServerSocket>>&
+  getSockets() const {
     return sockets_;
   }
 
+  std::shared_ptr<wangle::IOThreadPoolExecutor> getIOGroup() const {
+    return io_group_;
+  }
+
+  template <typename F>
+  void forEachWorker(F&& f) const {
+    workerFactory_->forEachWorker(f);
+  }
+
+  ServerSocketConfig socketConfig;
+
  private:
   std::shared_ptr<wangle::IOThreadPoolExecutor> acceptor_group_;
   std::shared_ptr<wangle::IOThreadPoolExecutor> io_group_;
 
-  std::shared_ptr<ServerWorkerFactory> workerFactory_;
+  std::shared_ptr<ServerWorkerPool> workerFactory_;
   std::vector<std::shared_ptr<folly::AsyncServerSocket>> sockets_;
 
   std::shared_ptr<AcceptorFactory> acceptorFactory_;