httpserver on serverbootstrap
authorDave Watson <davejwatson@fb.com>
Thu, 5 Mar 2015 18:25:54 +0000 (10:25 -0800)
committerNicholas Ormrod <njormrod@fb.com>
Thu, 5 Mar 2015 22:44:22 +0000 (14:44 -0800)
Summary:
Cleans up the httpserver startup code nicely.  The only major change to ServerBootstrap was a check if bind failed to throw an exception.

(depends on D1732895)

Test Plan:
fbconfig -r folly/wangle/bootstrap proxygen/httpserver; fbmake runtests

fbconfig -r dfsrouter; fbmake runtests

Reviewed By: hans@fb.com

Subscribers: yfeldblum, cgheorghe, trunkagent, doug, fugalh, bmatheny, folly-diffs@, jsedgwick

FB internal diff: D1800100

Signature: t1:1800100:1424733970:67a61a22d2affadea16d2fd725003915326077b2

folly/wangle/acceptor/Acceptor.h
folly/wangle/bootstrap/ServerBootstrap.h
folly/wangle/channel/ChannelPipeline.h
folly/wangle/concurrent/IOThreadPoolExecutor.h
folly/wangle/concurrent/ThreadPoolExecutor.h

index ee73b4715e1e1fd88b5bb1fc1ca8c4aec5ee05d8..7d7269c502156bbb343c92df98a05450820039c1 100644 (file)
@@ -92,7 +92,7 @@ class Acceptor :
    */
   uint32_t getNumConnections() const {
     return downstreamConnectionManager_ ?
-        downstreamConnectionManager_->getNumConnections() : 0;
+      (uint32_t)downstreamConnectionManager_->getNumConnections() : 0;
   }
 
   /**
index 925a9279c8468fccaf434db9e2a731a354f093f8..5a65186da013e9e4c3b4e7e2ff15198850f0816a 100644 (file)
@@ -20,6 +20,9 @@
 
 namespace folly {
 
+typedef folly::wangle::ChannelPipeline<
+  folly::IOBufQueue&, std::unique_ptr<folly::IOBuf>> DefaultPipeline;
+
 /*
  * ServerBootstrap is a parent class intended to set up a
  * high-performance TCP accepting server.  It will manage a pool of
@@ -60,8 +63,8 @@ class ServerBootstrap {
    *
    * @param childHandler - acceptor factory to call for each IO thread
    */
-  ServerBootstrap* childHandler(std::shared_ptr<AcceptorFactory> childHandler) {
-    acceptorFactory_ = childHandler;
+  ServerBootstrap* childHandler(std::shared_ptr<AcceptorFactory> h) {
+    acceptorFactory_ = h;
     return this;
   }
 
@@ -162,7 +165,7 @@ class ServerBootstrap {
     sockets_.push_back(socket);
   }
 
-  void bind(folly::SocketAddress address) {
+  void bind(folly::SocketAddress& address) {
     bindImpl(-1, address);
   }
 
@@ -174,10 +177,11 @@ class ServerBootstrap {
    */
   void bind(int port) {
     CHECK(port >= 0);
-    bindImpl(port, folly::SocketAddress());
+    folly::SocketAddress address;
+    bindImpl(port, address);
   }
 
-  void bindImpl(int port, folly::SocketAddress address) {
+  void bindImpl(int port, folly::SocketAddress& address) {
     if (!workerFactory_) {
       group(nullptr);
     }
@@ -190,24 +194,35 @@ class ServerBootstrap {
     std::mutex sock_lock;
     std::vector<std::shared_ptr<folly::AsyncServerSocket>> new_sockets;
 
+    std::exception_ptr exn;
+
     auto startupFunc = [&](std::shared_ptr<folly::Baton<>> barrier){
         auto socket = folly::AsyncServerSocket::newSocket();
-        sock_lock.lock();
-        new_sockets.push_back(socket);
-        sock_lock.unlock();
         socket->setReusePortEnabled(reusePort);
         socket->attachEventBase(EventBaseManager::get()->getEventBase());
-        if (port >= 0) {
-          socket->bind(port);
-        } else {
-          socket->bind(address);
-          port = address.getPort();
+
+        try {
+          if (port >= 0) {
+            socket->bind(port);
+          } else {
+            socket->bind(address);
+            port = address.getPort();
+          }
+
+          socket->listen(socketConfig.acceptBacklog);
+          socket->startAccepting();
+        } catch (...) {
+          exn = std::current_exception();
+          barrier->post();
+
+          return;
         }
-        socket->listen(socketConfig.acceptBacklog);
-        socket->startAccepting();
+
+        sock_lock.lock();
+        new_sockets.push_back(socket);
+        sock_lock.unlock();
 
         if (port == 0) {
-          SocketAddress address;
           socket->getAddress(&address);
           port = address.getPort();
         }
@@ -225,6 +240,10 @@ class ServerBootstrap {
       barrier->wait();
     }
 
+    if (exn) {
+      std::rethrow_exception(exn);
+    }
+
     // Startup all the threads
     for(auto socket : new_sockets) {
       workerFactory_->forEachWorker([this, socket](Acceptor* worker){
index 02cfb603dd5c3a355a882e17e88541d704564d34..f7918370f5e1ff4003bad9620b96e4015379b3b2 100644 (file)
@@ -282,8 +282,8 @@ class ChannelPipeline<R, W, Handler, Handlers...>
     ChannelPipeline<R, W, Handlers...>::finalizeHelper();
     back_ = ChannelPipeline<R, W, Handlers...>::back_;
     if (!back_) {
-      auto is_end = ChannelPipeline<R, W, Handlers...>::is_end;
-      CHECK(is_end);
+      auto is_at_end = ChannelPipeline<R, W, Handlers...>::is_end;
+      CHECK(is_at_end);
       back_ = dynamic_cast<OutboundChannelHandlerContext<W>*>(&ctx_);
       if (!back_) {
         throw std::invalid_argument("wrong type for last handler");
index ddf8dfe91ce06604dbd3e9596abfda9d4792c725..f3c5865f36c8eb70b8d2e5f8a27e7066d6d4f9bf 100644 (file)
@@ -41,7 +41,7 @@ class IOThreadPoolExecutor : public ThreadPoolExecutor, public IOExecutor {
 
   EventBase* getEventBase() override;
 
-  EventBase* getEventBase(ThreadPoolExecutor::ThreadHandle*);
+  static EventBase* getEventBase(ThreadPoolExecutor::ThreadHandle*);
 
  private:
   struct FOLLY_ALIGN_TO_AVOID_FALSE_SHARING IOThread : public Thread {
index a0a0b5872f502eedb3d829c776b2a06eb789154a..32a0330da9f59d50afa59b8a90e1d6d2d4dc0721 100644 (file)
@@ -104,8 +104,12 @@ class ThreadPoolExecutor : public virtual Executor {
    public:
     virtual void threadStarted(ThreadHandle*) = 0;
     virtual void threadStopped(ThreadHandle*) = 0;
-    virtual void threadPreviouslyStarted(ThreadHandle*) = 0;
-    virtual void threadNotYetStopped(ThreadHandle*) = 0;
+    virtual void threadPreviouslyStarted(ThreadHandle* h) {
+      threadStarted(h);
+    }
+    virtual void threadNotYetStopped(ThreadHandle* h) {
+      threadStopped(h);
+    }
     virtual ~Observer() = default;
   };