move asyncserversocket to folly
authorDave Watson <davejwatson@fb.com>
Thu, 25 Sep 2014 17:17:20 +0000 (10:17 -0700)
committerdcsommer <dcsommer@fb.com>
Wed, 29 Oct 2014 23:04:24 +0000 (16:04 -0700)
Summary:
Changes:
* namespace to folly
* some std::chrono replacesments
* Moves shutdownSocketSet stuff to thriftserver instead of in the socket itself.
* Changes exception types.  I don't see anywhere that uses the TAsync*SERVER*socket exceptions depending on the TTransport type, but I could be wrong? I don't really know what to do about the exception types

unittests still postponed overnight.

Test Plan:
fbconfig -r thrift; fbmake runtests
fbconfig -r proxygen; fbmake runtests

Reviewed By: dcsommer@fb.com

Subscribers: hphp-diffs@, ps, folly-diffs@, anca, trunkagent, jsedgwick, fugalh, doug, alandau, bmatheny, njormrod

FB internal diff: D1579187

folly/Makefile.am
folly/io/async/AsyncServerSocket.cpp [new file with mode: 0644]
folly/io/async/AsyncServerSocket.h [new file with mode: 0644]

index 240824d5c4b0d9e8a19a0d975aea1137e3547b0c..f1ca4ac534bfe8f4f73f6c6998cf32aceedc9d4c 100644 (file)
@@ -130,6 +130,7 @@ nobase_follyinclude_HEADERS = \
        io/TypedIOBuf.h \
        io/ShutdownSocketSet.h \
        io/async/AsyncTimeout.h \
+       io/async/AsyncServerSocket.h \
        io/async/DelayedDestruction.h \
        io/async/EventBase.h \
        io/async/EventBaseManager.h \
@@ -256,6 +257,7 @@ libfolly_la_SOURCES = \
        io/RecordIO.cpp \
        io/ShutdownSocketSet.cpp \
        io/async/AsyncTimeout.cpp \
+       io/async/AsyncServerSocket.cpp \
        io/async/EventBase.cpp \
        io/async/EventBaseManager.cpp \
        io/async/EventHandler.cpp \
diff --git a/folly/io/async/AsyncServerSocket.cpp b/folly/io/async/AsyncServerSocket.cpp
new file mode 100644 (file)
index 0000000..7dce10c
--- /dev/null
@@ -0,0 +1,861 @@
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __STDC_FORMAT_MACROS
+  #define __STDC_FORMAT_MACROS
+#endif
+
+#include <folly/io/async/AsyncServerSocket.h>
+
+#include <folly/io/async/EventBase.h>
+#include <folly/io/async/NotificationQueue.h>
+#include <folly/SocketAddress.h>
+
+#include <errno.h>
+#include <string.h>
+#include <unistd.h>
+#include <fcntl.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/tcp.h>
+
+namespace folly {
+
+const uint32_t AsyncServerSocket::kDefaultMaxAcceptAtOnce;
+const uint32_t AsyncServerSocket::kDefaultCallbackAcceptAtOnce;
+const uint32_t AsyncServerSocket::kDefaultMaxMessagesInQueue;
+
+int setCloseOnExec(int fd, int value) {
+  // Read the current flags
+  int old_flags = fcntl(fd, F_GETFD, 0);
+
+  // If reading the flags failed, return error indication now
+  if (old_flags < 0)
+    return -1;
+
+  // Set just the flag we want to set
+  int new_flags;
+  if (value != 0)
+    new_flags = old_flags | FD_CLOEXEC;
+  else
+    new_flags = old_flags & ~FD_CLOEXEC;
+
+  // Store modified flag word in the descriptor
+  return fcntl(fd, F_SETFD, new_flags);
+}
+
+void AsyncServerSocket::RemoteAcceptor::start(
+  EventBase* eventBase, uint32_t maxAtOnce, uint32_t maxInQueue) {
+  setMaxReadAtOnce(maxAtOnce);
+  queue_.setMaxQueueSize(maxInQueue);
+
+  if (!eventBase->runInEventBaseThread([=](){
+        callback_->acceptStarted();
+        this->startConsuming(eventBase, &queue_);
+      })) {
+    throw std::invalid_argument("unable to start waiting on accept "
+                            "notification queue in the specified "
+                            "EventBase thread");
+  }
+}
+
+void AsyncServerSocket::RemoteAcceptor::stop(
+  EventBase* eventBase, AcceptCallback* callback) {
+  if (!eventBase->runInEventBaseThread([=](){
+        callback->acceptStopped();
+        delete this;
+      })) {
+    throw std::invalid_argument("unable to start waiting on accept "
+                            "notification queue in the specified "
+                            "EventBase thread");
+  }
+}
+
+void AsyncServerSocket::RemoteAcceptor::messageAvailable(
+  QueueMessage&& msg) {
+
+  switch (msg.type) {
+    case MessageType::MSG_NEW_CONN:
+    {
+      callback_->connectionAccepted(msg.fd, msg.address);
+      break;
+    }
+    case MessageType::MSG_ERROR:
+    {
+      std::runtime_error ex(msg.msg);
+      callback_->acceptError(ex);
+      break;
+    }
+    default:
+    {
+      LOG(ERROR) << "invalid accept notification message type "
+                 << int(msg.type);
+      std::runtime_error ex(
+        "received invalid accept notification message type");
+      callback_->acceptError(ex);
+    }
+  }
+}
+
+/*
+ * AsyncServerSocket::BackoffTimeout
+ */
+class AsyncServerSocket::BackoffTimeout : public AsyncTimeout {
+ public:
+  BackoffTimeout(AsyncServerSocket* socket)
+    : AsyncTimeout(socket->getEventBase()),
+      socket_(socket) {}
+
+  virtual void timeoutExpired() noexcept {
+    socket_->backoffTimeoutExpired();
+  }
+
+ private:
+  AsyncServerSocket* socket_;
+};
+
+/*
+ * AsyncServerSocket methods
+ */
+
+AsyncServerSocket::AsyncServerSocket(EventBase* eventBase)
+:   eventBase_(eventBase),
+    accepting_(false),
+    maxAcceptAtOnce_(kDefaultMaxAcceptAtOnce),
+    maxNumMsgsInQueue_(kDefaultMaxMessagesInQueue),
+    acceptRateAdjustSpeed_(0),
+    acceptRate_(1),
+    lastAccepTimestamp_(std::chrono::steady_clock::now()),
+    numDroppedConnections_(0),
+    callbackIndex_(0),
+    backoffTimeout_(nullptr),
+    callbacks_(),
+    keepAliveEnabled_(true),
+    closeOnExec_(true),
+    shutdownSocketSet_(nullptr) {
+}
+
+void AsyncServerSocket::setShutdownSocketSet(ShutdownSocketSet* newSS) {
+  if (shutdownSocketSet_ == newSS) {
+    return;
+  }
+  if (shutdownSocketSet_) {
+    for (auto& h : sockets_) {
+      shutdownSocketSet_->remove(h.socket_);
+    }
+  }
+  shutdownSocketSet_ = newSS;
+  if (shutdownSocketSet_) {
+    for (auto& h : sockets_) {
+      shutdownSocketSet_->add(h.socket_);
+    }
+  }
+}
+
+AsyncServerSocket::~AsyncServerSocket() {
+  assert(callbacks_.empty());
+}
+
+int AsyncServerSocket::stopAccepting(int shutdownFlags) {
+  int result = 0;
+  for (auto& handler : sockets_) {
+    VLOG(10) << "AsyncServerSocket::stopAccepting " << this <<
+              handler.socket_;
+  }
+  assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
+
+  // When destroy is called, unregister and close the socket immediately
+  accepting_ = false;
+
+  for (auto& handler : sockets_) {
+    handler.unregisterHandler();
+    if (shutdownSocketSet_) {
+      shutdownSocketSet_->close(handler.socket_);
+    } else if (shutdownFlags >= 0) {
+      result = ::shutdown(handler.socket_, shutdownFlags);
+      pendingCloseSockets_.push_back(handler.socket_);
+    } else {
+      ::close(handler.socket_);
+    }
+  }
+  sockets_.clear();
+
+  // Destroy the backoff timout.  This will cancel it if it is running.
+  delete backoffTimeout_;
+  backoffTimeout_ = nullptr;
+
+  // Close all of the callback queues to notify them that they are being
+  // destroyed.  No one should access the AsyncServerSocket any more once
+  // destroy() is called.  However, clear out callbacks_ before invoking the
+  // accept callbacks just in case.  This will potentially help us detect the
+  // bug if one of the callbacks calls addAcceptCallback() or
+  // removeAcceptCallback().
+  std::vector<CallbackInfo> callbacksCopy;
+  callbacks_.swap(callbacksCopy);
+  for (std::vector<CallbackInfo>::iterator it = callbacksCopy.begin();
+       it != callbacksCopy.end();
+       ++it) {
+    it->consumer->stop(it->eventBase, it->callback);
+  }
+
+  return result;
+}
+
+void AsyncServerSocket::destroy() {
+  stopAccepting();
+  for (auto s: pendingCloseSockets_) {
+    ::close(s);
+  }
+  // Then call DelayedDestruction::destroy() to take care of
+  // whether or not we need immediate or delayed destruction
+  DelayedDestruction::destroy();
+}
+
+void AsyncServerSocket::attachEventBase(EventBase *eventBase) {
+  assert(eventBase_ == nullptr);
+  assert(eventBase->isInEventBaseThread());
+
+  eventBase_ = eventBase;
+  for (auto& handler : sockets_) {
+    handler.attachEventBase(eventBase);
+  }
+}
+
+void AsyncServerSocket::detachEventBase() {
+  assert(eventBase_ != nullptr);
+  assert(eventBase_->isInEventBaseThread());
+  assert(!accepting_);
+
+  eventBase_ = nullptr;
+  for (auto& handler : sockets_) {
+    handler.detachEventBase();
+  }
+}
+
+void AsyncServerSocket::useExistingSockets(const std::vector<int>& fds) {
+  assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
+
+  if (sockets_.size() > 0) {
+    throw std::invalid_argument(
+                              "cannot call useExistingSocket() on a "
+                              "AsyncServerSocket that already has a socket");
+  }
+
+  for (auto fd: fds) {
+    // Set addressFamily_ from this socket.
+    // Note that the socket may not have been bound yet, but
+    // setFromLocalAddress() will still work and get the correct address family.
+    // We will update addressFamily_ again anyway if bind() is called later.
+    SocketAddress address;
+    address.setFromLocalAddress(fd);
+
+    setupSocket(fd);
+    sockets_.push_back(
+      ServerEventHandler(eventBase_, fd, this, address.getFamily()));
+    sockets_.back().changeHandlerFD(fd);
+  }
+}
+
+void AsyncServerSocket::useExistingSocket(int fd) {
+  useExistingSockets({fd});
+}
+
+void AsyncServerSocket::bind(const SocketAddress& address) {
+  assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
+
+  // useExistingSocket() may have been called to initialize socket_ already.
+  // However, in the normal case we need to create a new socket now.
+  // Don't set socket_ yet, so that socket_ will remain uninitialized if an
+  // error occurs.
+  int fd;
+  if (sockets_.size() == 0) {
+    fd = createSocket(address.getFamily());
+  } else if (sockets_.size() == 1) {
+    if (address.getFamily() != sockets_[0].addressFamily_) {
+      throw std::invalid_argument(
+                                "Attempted to bind address to socket with "
+                                "different address family");
+    }
+    fd = sockets_[0].socket_;
+  } else {
+    throw std::invalid_argument(
+                              "Attempted to bind to multiple fds");
+  }
+
+  // Bind to the socket
+  sockaddr_storage addrStorage;
+  address.getAddress(&addrStorage);
+  sockaddr* saddr = reinterpret_cast<sockaddr*>(&addrStorage);
+  if (::bind(fd, saddr, address.getActualSize()) != 0) {
+    if (sockets_.size() == 0) {
+      ::close(fd);
+    }
+    folly::throwSystemError(errno,
+                              "failed to bind to async server socket: " +
+                                address.describe());
+  }
+
+  // Record the address family that we are using,
+  // so we know how much address space we need to record accepted addresses.
+
+  // If we just created this socket, update the EventHandler and set socket_
+  if (sockets_.size() == 0) {
+    sockets_.push_back(
+      ServerEventHandler(eventBase_, fd, this, address.getFamily()));
+    sockets_[0].changeHandlerFD(fd);
+  }
+}
+
+void AsyncServerSocket::bind(uint16_t port) {
+  struct addrinfo hints, *res, *res0;
+  char sport[sizeof("65536")];
+
+  memset(&hints, 0, sizeof(hints));
+  hints.ai_family = AF_UNSPEC;
+  hints.ai_socktype = SOCK_STREAM;
+  hints.ai_flags = AI_PASSIVE;
+  snprintf(sport, sizeof(sport), "%u", port);
+
+  if (getaddrinfo(nullptr, sport, &hints, &res0)) {
+    throw std::invalid_argument(
+                              "Attempted to bind address to socket with "
+                              "bad getaddrinfo");
+  }
+
+  folly::ScopeGuard guard = folly::makeGuard([&]{
+      freeaddrinfo(res0);
+    });
+  DCHECK(&guard);
+
+  for (res = res0; res; res = res->ai_next) {
+    int s = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
+    // IPv6/IPv4 may not be supported by the kernel
+    if (s < 0 && errno == EAFNOSUPPORT) {
+      continue;
+    }
+    CHECK(s);
+
+    try {
+      setupSocket(s);
+    } catch (...) {
+      ::close(s);
+      throw;
+    }
+
+    if (res->ai_family == AF_INET6) {
+      int v6only = 1;
+      CHECK(0 == setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY,
+                            &v6only, sizeof(v6only)));
+    }
+
+    SocketAddress address;
+    address.setFromLocalAddress(s);
+
+    sockets_.push_back(
+      ServerEventHandler(eventBase_, s, this, address.getFamily()));
+
+    // Bind to the socket
+    if (::bind(s, res->ai_addr, res->ai_addrlen) != 0) {
+      folly::throwSystemError(
+        errno,
+        "failed to bind to async server socket for port");
+    }
+  }
+  if (sockets_.size() == 0) {
+    throw std::runtime_error(
+        "did not bind any async server socket for port");
+  }
+}
+
+void AsyncServerSocket::listen(int backlog) {
+  assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
+
+  // Start listening
+  for (auto& handler : sockets_) {
+    if (::listen(handler.socket_, backlog) == -1) {
+      folly::throwSystemError(errno,
+                                    "failed to listen on async server socket");
+    }
+  }
+}
+
+void AsyncServerSocket::getAddress(SocketAddress* addressReturn) const {
+  CHECK(sockets_.size() >= 1);
+  if (sockets_.size() > 1) {
+    VLOG(2) << "Warning: getAddress can return multiple addresses, " <<
+      "but getAddress was called, so only returning the first";
+  }
+  addressReturn->setFromLocalAddress(sockets_[0].socket_);
+}
+
+std::vector<SocketAddress> AsyncServerSocket::getAddresses()
+    const {
+  CHECK(sockets_.size() >= 1);
+  auto tsaVec = std::vector<SocketAddress>(sockets_.size());
+  auto tsaIter = tsaVec.begin();
+  for (const auto& socket : sockets_) {
+    (tsaIter++)->setFromLocalAddress(socket.socket_);
+  };
+  return tsaVec;
+}
+
+void AsyncServerSocket::addAcceptCallback(AcceptCallback *callback,
+                                           EventBase *eventBase,
+                                           uint32_t maxAtOnce) {
+  assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
+
+  // If this is the first accept callback and we are supposed to be accepting,
+  // start accepting once the callback is installed.
+  bool runStartAccepting = accepting_ && callbacks_.empty();
+
+  if (!eventBase) {
+    eventBase = eventBase_; // Run in AsyncServerSocket's eventbase
+  }
+
+  callbacks_.push_back(CallbackInfo(callback, eventBase));
+
+  // Start the remote acceptor.
+  //
+  // It would be nice if we could avoid starting the remote acceptor if
+  // eventBase == eventBase_.  However, that would cause issues if
+  // detachEventBase() and attachEventBase() were ever used to change the
+  // primary EventBase for the server socket.  Therefore we require the caller
+  // to specify a nullptr EventBase if they want to ensure that the callback is
+  // always invoked in the primary EventBase, and to be able to invoke that
+  // callback more efficiently without having to use a notification queue.
+  RemoteAcceptor* acceptor = nullptr;
+  try {
+    acceptor = new RemoteAcceptor(callback);
+    acceptor->start(eventBase, maxAtOnce, maxNumMsgsInQueue_);
+  } catch (...) {
+    callbacks_.pop_back();
+    delete acceptor;
+    throw;
+  }
+  callbacks_.back().consumer = acceptor;
+
+  // If this is the first accept callback and we are supposed to be accepting,
+  // start accepting.
+  if (runStartAccepting) {
+    startAccepting();
+  }
+}
+
+void AsyncServerSocket::removeAcceptCallback(AcceptCallback *callback,
+                                              EventBase *eventBase) {
+  assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
+
+  // Find the matching AcceptCallback.
+  // We just do a simple linear search; we don't expect removeAcceptCallback()
+  // to be called frequently, and we expect there to only be a small number of
+  // callbacks anyway.
+  std::vector<CallbackInfo>::iterator it = callbacks_.begin();
+  uint32_t n = 0;
+  while (true) {
+    if (it == callbacks_.end()) {
+      throw std::runtime_error("AsyncServerSocket::removeAcceptCallback(): "
+                              "accept callback not found");
+    }
+    if (it->callback == callback &&
+        (it->eventBase == eventBase || eventBase == nullptr)) {
+      break;
+    }
+    ++it;
+    ++n;
+  }
+
+  // Remove this callback from callbacks_.
+  //
+  // Do this before invoking the acceptStopped() callback, in case
+  // acceptStopped() invokes one of our methods that examines callbacks_.
+  //
+  // Save a copy of the CallbackInfo first.
+  CallbackInfo info(*it);
+  callbacks_.erase(it);
+  if (n < callbackIndex_) {
+    // We removed an element before callbackIndex_.  Move callbackIndex_ back
+    // one step, since things after n have been shifted back by 1.
+    --callbackIndex_;
+  } else {
+    // We removed something at or after callbackIndex_.
+    // If we removed the last element and callbackIndex_ was pointing at it,
+    // we need to reset callbackIndex_ to 0.
+    if (callbackIndex_ >= callbacks_.size()) {
+      callbackIndex_ = 0;
+    }
+  }
+
+  info.consumer->stop(info.eventBase, info.callback);
+
+  // If we are supposed to be accepting but the last accept callback
+  // was removed, unregister for events until a callback is added.
+  if (accepting_ && callbacks_.empty()) {
+    for (auto& handler : sockets_) {
+      handler.unregisterHandler();
+    }
+  }
+}
+
+void AsyncServerSocket::startAccepting() {
+  assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
+
+  accepting_ = true;
+  if (callbacks_.empty()) {
+    // We can't actually begin accepting if no callbacks are defined.
+    // Wait until a callback is added to start accepting.
+    return;
+  }
+
+  for (auto& handler : sockets_) {
+    if (!handler.registerHandler(
+          EventHandler::READ | EventHandler::PERSIST)) {
+      throw std::runtime_error("failed to register for accept events");
+    }
+  }
+}
+
+void AsyncServerSocket::pauseAccepting() {
+  assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
+  accepting_ = false;
+  for (auto& handler : sockets_) {
+   handler. unregisterHandler();
+  }
+
+  // If we were in the accept backoff state, disable the backoff timeout
+  if (backoffTimeout_) {
+    backoffTimeout_->cancelTimeout();
+  }
+}
+
+int AsyncServerSocket::createSocket(int family) {
+  int fd = socket(family, SOCK_STREAM, 0);
+  if (fd == -1) {
+    folly::throwSystemError(errno, "error creating async server socket");
+  }
+
+  try {
+    setupSocket(fd);
+  } catch (...) {
+    ::close(fd);
+    throw;
+  }
+  return fd;
+}
+
+void AsyncServerSocket::setupSocket(int fd) {
+  // Get the address family
+  SocketAddress address;
+  address.setFromLocalAddress(fd);
+  auto family = address.getFamily();
+
+  // Put the socket in non-blocking mode
+  if (fcntl(fd, F_SETFL, O_NONBLOCK) != 0) {
+    folly::throwSystemError(errno,
+                            "failed to put socket in non-blocking mode");
+  }
+
+  // Set reuseaddr to avoid 2MSL delay on server restart
+  int one = 1;
+  if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)) != 0) {
+    // This isn't a fatal error; just log an error message and continue
+    LOG(ERROR) << "failed to set SO_REUSEADDR on async server socket " << errno;
+  }
+
+  // Set keepalive as desired
+  int zero = 0;
+  if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE,
+                 (keepAliveEnabled_) ? &one : &zero, sizeof(int)) != 0) {
+    LOG(ERROR) << "failed to set SO_KEEPALIVE on async server socket: " <<
+            strerror(errno);
+  }
+
+  // Setup FD_CLOEXEC flag
+  if (closeOnExec_ &&
+      (-1 == folly::setCloseOnExec(fd, closeOnExec_))) {
+    LOG(ERROR) << "failed to set FD_CLOEXEC on async server socket: " <<
+            strerror(errno);
+  }
+
+  // Set TCP nodelay if available, MAC OS X Hack
+  // See http://lists.danga.com/pipermail/memcached/2005-March/001240.html
+#ifndef TCP_NOPUSH
+  if (family != AF_UNIX) {
+    if (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one)) != 0) {
+      // This isn't a fatal error; just log an error message and continue
+      LOG(ERROR) << "failed to set TCP_NODELAY on async server socket: " <<
+              strerror(errno);
+    }
+  }
+#endif
+
+  if (shutdownSocketSet_) {
+    shutdownSocketSet_->add(fd);
+  }
+}
+
+void AsyncServerSocket::handlerReady(
+  uint16_t events, int fd, sa_family_t addressFamily) noexcept {
+  assert(!callbacks_.empty());
+  DestructorGuard dg(this);
+
+  // Only accept up to maxAcceptAtOnce_ connections at a time,
+  // to avoid starving other I/O handlers using this EventBase.
+  for (uint32_t n = 0; n < maxAcceptAtOnce_; ++n) {
+    SocketAddress address;
+
+    sockaddr_storage addrStorage;
+    socklen_t addrLen = sizeof(addrStorage);
+    sockaddr* saddr = reinterpret_cast<sockaddr*>(&addrStorage);
+
+    // In some cases, accept() doesn't seem to update these correctly.
+    saddr->sa_family = addressFamily;
+    if (addressFamily == AF_UNIX) {
+      addrLen = sizeof(struct sockaddr_un);
+    }
+
+    // Accept a new client socket
+#ifdef SOCK_NONBLOCK
+    int clientSocket = accept4(fd, saddr, &addrLen, SOCK_NONBLOCK);
+#else
+    int clientSocket = accept(fd, saddr, &addrLen);
+#endif
+
+    address.setFromSockaddr(saddr, addrLen);
+
+    std::chrono::time_point<std::chrono::steady_clock> nowMs =
+      std::chrono::steady_clock::now();
+    int64_t timeSinceLastAccept = std::max(
+      int64_t(0),
+      nowMs.time_since_epoch().count() -
+      lastAccepTimestamp_.time_since_epoch().count());
+    lastAccepTimestamp_ = nowMs;
+    if (acceptRate_ < 1) {
+      acceptRate_ *= 1 + acceptRateAdjustSpeed_ * timeSinceLastAccept;
+      if (acceptRate_ >= 1) {
+        acceptRate_ = 1;
+      } else if (rand() > acceptRate_ * RAND_MAX) {
+        ++numDroppedConnections_;
+        if (clientSocket >= 0) {
+          ::close(clientSocket);
+        }
+        continue;
+      }
+    }
+
+    if (clientSocket < 0) {
+      if (errno == EAGAIN) {
+        // No more sockets to accept right now.
+        // Check for this code first, since it's the most common.
+        return;
+      } else if (errno == EMFILE || errno == ENFILE) {
+        // We're out of file descriptors.  Perhaps we're accepting connections
+        // too quickly. Pause accepting briefly to back off and give the server
+        // a chance to recover.
+        LOG(ERROR) << "accept failed: out of file descriptors; entering accept "
+                "back-off state";
+        enterBackoff();
+
+        // Dispatch the error message
+        dispatchError("accept() failed", errno);
+      } else {
+        dispatchError("accept() failed", errno);
+      }
+      return;
+    }
+
+#ifndef SOCK_NONBLOCK
+    // Explicitly set the new connection to non-blocking mode
+    if (fcntl(clientSocket, F_SETFL, O_NONBLOCK) != 0) {
+      ::close(clientSocket);
+      dispatchError("failed to set accepted socket to non-blocking mode",
+                    errno);
+      return;
+    }
+#endif
+
+    // Inform the callback about the new connection
+    dispatchSocket(clientSocket, std::move(address));
+
+    // If we aren't accepting any more, break out of the loop
+    if (!accepting_ || callbacks_.empty()) {
+      break;
+    }
+  }
+}
+
+void AsyncServerSocket::dispatchSocket(int socket,
+                                        SocketAddress&& address) {
+  uint32_t startingIndex = callbackIndex_;
+
+  // Short circuit if the callback is in the primary EventBase thread
+
+  CallbackInfo *info = nextCallback();
+  if (info->eventBase == nullptr) {
+    info->callback->connectionAccepted(socket, address);
+    return;
+  }
+
+  // Create a message to send over the notification queue
+  QueueMessage msg;
+  msg.type = MessageType::MSG_NEW_CONN;
+  msg.address = std::move(address);
+  msg.fd = socket;
+
+  // Loop until we find a free queue to write to
+  while (true) {
+    if (info->consumer->getQueue()->tryPutMessageNoThrow(std::move(msg))) {
+      // Success! return.
+      return;
+   }
+
+    // We couldn't add to queue.  Fall through to below
+
+    ++numDroppedConnections_;
+    if (acceptRateAdjustSpeed_ > 0) {
+      // aggressively decrease accept rate when in trouble
+      static const double kAcceptRateDecreaseSpeed = 0.1;
+      acceptRate_ *= 1 - kAcceptRateDecreaseSpeed;
+    }
+
+
+    if (callbackIndex_ == startingIndex) {
+      // The notification queue was full
+      // We can't really do anything at this point other than close the socket.
+      //
+      // This should only happen if a user's service is behaving extremely
+      // badly and none of the EventBase threads are looping fast enough to
+      // process the incoming connections.  If the service is overloaded, it
+      // should use pauseAccepting() to temporarily back off accepting new
+      // connections, before they reach the point where their threads can't
+      // even accept new messages.
+      LOG(ERROR) << "failed to dispatch newly accepted socket:"
+                 << " all accept callback queues are full";
+      ::close(socket);
+      return;
+    }
+
+    info = nextCallback();
+  }
+}
+
+void AsyncServerSocket::dispatchError(const char *msgstr, int errnoValue) {
+  uint32_t startingIndex = callbackIndex_;
+  CallbackInfo *info = nextCallback();
+
+  // Create a message to send over the notification queue
+  QueueMessage msg;
+  msg.type = MessageType::MSG_ERROR;
+  msg.err = errnoValue;
+  msg.msg = std::move(msgstr);
+
+  while (true) {
+    // Short circuit if the callback is in the primary EventBase thread
+    if (info->eventBase == nullptr) {
+      std::runtime_error ex(msgstr + errnoValue);
+      info->callback->acceptError(ex);
+      return;
+    }
+
+    if (info->consumer->getQueue()->tryPutMessageNoThrow(std::move(msg))) {
+      return;
+    }
+    // Fall through and try another callback
+
+    if (callbackIndex_ == startingIndex) {
+      // The notification queues for all of the callbacks were full.
+      // We can't really do anything at this point.
+      LOG(ERROR) << "failed to dispatch accept error: all accept callback "
+        "queues are full: error msg:  " <<
+        msg.msg.c_str() << errnoValue;
+      return;
+    }
+    info = nextCallback();
+  }
+}
+
+void AsyncServerSocket::enterBackoff() {
+  // If this is the first time we have entered the backoff state,
+  // allocate backoffTimeout_.
+  if (backoffTimeout_ == nullptr) {
+    try {
+      backoffTimeout_ = new BackoffTimeout(this);
+    } catch (const std::bad_alloc& ex) {
+      // Man, we couldn't even allocate the timer to re-enable accepts.
+      // We must be in pretty bad shape.  Don't pause accepting for now,
+      // since we won't be able to re-enable ourselves later.
+      LOG(ERROR) << "failed to allocate AsyncServerSocket backoff"
+                 << " timer; unable to temporarly pause accepting";
+      return;
+    }
+  }
+
+  // For now, we simply pause accepting for 1 second.
+  //
+  // We could add some smarter backoff calculation here in the future.  (e.g.,
+  // start sleeping for longer if we keep hitting the backoff frequently.)
+  // Typically the user needs to figure out why the server is overloaded and
+  // fix it in some other way, though.  The backoff timer is just a simple
+  // mechanism to try and give the connection processing code a little bit of
+  // breathing room to catch up, and to avoid just spinning and failing to
+  // accept over and over again.
+  const uint32_t timeoutMS = 1000;
+  if (!backoffTimeout_->scheduleTimeout(timeoutMS)) {
+    LOG(ERROR) << "failed to schedule AsyncServerSocket backoff timer;"
+               << "unable to temporarly pause accepting";
+    return;
+  }
+
+  // The backoff timer is scheduled to re-enable accepts.
+  // Go ahead and disable accepts for now.  We leave accepting_ set to true,
+  // since that tracks the desired state requested by the user.
+  for (auto& handler : sockets_) {
+    handler.unregisterHandler();
+  }
+}
+
+void AsyncServerSocket::backoffTimeoutExpired() {
+  // accepting_ should still be true.
+  // If pauseAccepting() was called while in the backoff state it will cancel
+  // the backoff timeout.
+  assert(accepting_);
+  // We can't be detached from the EventBase without being paused
+  assert(eventBase_ != nullptr && eventBase_->isInEventBaseThread());
+
+  // If all of the callbacks were removed, we shouldn't re-enable accepts
+  if (callbacks_.empty()) {
+    return;
+  }
+
+  // Register the handler.
+  for (auto& handler : sockets_) {
+    if (!handler.registerHandler(
+          EventHandler::READ | EventHandler::PERSIST)) {
+      // We're hosed.  We could just re-schedule backoffTimeout_ to
+      // re-try again after a little bit.  However, we don't want to
+      // loop retrying forever if we can't re-enable accepts.  Just
+      // abort the entire program in this state; things are really bad
+      // and restarting the entire server is probably the best remedy.
+      LOG(ERROR)
+        << "failed to re-enable AsyncServerSocket accepts after backoff; "
+        << "crashing now";
+      abort();
+    }
+  }
+}
+
+
+
+} // folly
diff --git a/folly/io/async/AsyncServerSocket.h b/folly/io/async/AsyncServerSocket.h
new file mode 100644 (file)
index 0000000..449b5a4
--- /dev/null
@@ -0,0 +1,682 @@
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <folly/io/async/DelayedDestruction.h>
+#include <folly/io/async/EventHandler.h>
+#include <folly/io/async/EventBase.h>
+#include <folly/io/async/NotificationQueue.h>
+#include <folly/io/async/AsyncTimeout.h>
+#include <folly/io/ShutdownSocketSet.h>
+#include <folly/SocketAddress.h>
+#include <memory>
+#include <exception>
+#include <vector>
+#include <limits.h>
+#include <stddef.h>
+#include <sys/socket.h>
+
+namespace folly {
+
+/**
+ * A listening socket that asynchronously informs a callback whenever a new
+ * connection has been accepted.
+ *
+ * Unlike most async interfaces that always invoke their callback in the same
+ * EventBase thread, AsyncServerSocket is unusual in that it can distribute
+ * the callbacks across multiple EventBase threads.
+ *
+ * This supports a common use case for network servers to distribute incoming
+ * connections across a number of EventBase threads.  (Servers typically run
+ * with one EventBase thread per CPU.)
+ *
+ * Despite being able to invoke callbacks in multiple EventBase threads,
+ * AsyncServerSocket still has one "primary" EventBase.  Operations that
+ * modify the AsyncServerSocket state may only be performed from the primary
+ * EventBase thread.
+ */
+class AsyncServerSocket : public DelayedDestruction {
+ public:
+  typedef std::unique_ptr<AsyncServerSocket, Destructor> UniquePtr;
+
+  class AcceptCallback {
+   public:
+    virtual ~AcceptCallback() {}
+
+    /**
+     * connectionAccepted() is called whenever a new client connection is
+     * received.
+     *
+     * The AcceptCallback will remain installed after connectionAccepted()
+     * returns.
+     *
+     * @param fd          The newly accepted client socket.  The AcceptCallback
+     *                    assumes ownership of this socket, and is responsible
+     *                    for closing it when done.  The newly accepted file
+     *                    descriptor will have already been put into
+     *                    non-blocking mode.
+     * @param clientAddr  A reference to a TSocketAddress struct containing the
+     *                    client's address.  This struct is only guaranteed to
+     *                    remain valid until connectionAccepted() returns.
+     */
+    virtual void connectionAccepted(int fd,
+                                    const SocketAddress& clientAddr)
+      noexcept = 0;
+
+    /**
+     * acceptError() is called if an error occurs while accepting.
+     *
+     * The AcceptCallback will remain installed even after an accept error,
+     * as the errors are typically somewhat transient, such as being out of
+     * file descriptors.  The server socket must be explicitly stopped if you
+     * wish to stop accepting after an error.
+     *
+     * @param ex  An exception representing the error.
+     */
+    virtual void acceptError(const std::exception& ex) noexcept = 0;
+
+    /**
+     * acceptStarted() will be called in the callback's EventBase thread
+     * after this callback has been added to the AsyncServerSocket.
+     *
+     * acceptStarted() will be called before any calls to connectionAccepted()
+     * or acceptError() are made on this callback.
+     *
+     * acceptStarted() makes it easier for callbacks to perform initialization
+     * inside the callback thread.  (The call to addAcceptCallback() must
+     * always be made from the AsyncServerSocket's primary EventBase thread.
+     * acceptStarted() provides a hook that will always be invoked in the
+     * callback's thread.)
+     *
+     * Note that the call to acceptStarted() is made once the callback is
+     * added, regardless of whether or not the AsyncServerSocket is actually
+     * accepting at the moment.  acceptStarted() will be called even if the
+     * AsyncServerSocket is paused when the callback is added (including if
+     * the initial call to startAccepting() on the AsyncServerSocket has not
+     * been made yet).
+     */
+    virtual void acceptStarted() noexcept {}
+
+    /**
+     * acceptStopped() will be called when this AcceptCallback is removed from
+     * the AsyncServerSocket, or when the AsyncServerSocket is destroyed,
+     * whichever occurs first.
+     *
+     * No more calls to connectionAccepted() or acceptError() will be made
+     * after acceptStopped() is invoked.
+     */
+    virtual void acceptStopped() noexcept {}
+  };
+
+  static const uint32_t kDefaultMaxAcceptAtOnce = 30;
+  static const uint32_t kDefaultCallbackAcceptAtOnce = 5;
+  static const uint32_t kDefaultMaxMessagesInQueue = 0;
+  /**
+   * Create a new AsyncServerSocket with the specified EventBase.
+   *
+   * @param eventBase  The EventBase to use for driving the asynchronous I/O.
+   *                   If this parameter is nullptr, attachEventBase() must be
+   *                   called before this socket can begin accepting
+   *                   connections.
+   */
+  explicit AsyncServerSocket(EventBase* eventBase = nullptr);
+
+  /**
+   * Helper function to create a shared_ptr<AsyncServerSocket>.
+   *
+   * This passes in the correct destructor object, since AsyncServerSocket's
+   * destructor is protected and cannot be invoked directly.
+   */
+  static std::shared_ptr<AsyncServerSocket>
+  newSocket(EventBase* evb = nullptr) {
+    return std::shared_ptr<AsyncServerSocket>(new AsyncServerSocket(evb),
+                                                 Destructor());
+  }
+
+  void setShutdownSocketSet(ShutdownSocketSet* newSS);
+
+  /**
+   * Destroy the socket.
+   *
+   * AsyncServerSocket::destroy() must be called to destroy the socket.
+   * The normal destructor is private, and should not be invoked directly.
+   * This prevents callers from deleting a AsyncServerSocket while it is
+   * invoking a callback.
+   *
+   * destroy() must be invoked from the socket's primary EventBase thread.
+   *
+   * If there are AcceptCallbacks still installed when destroy() is called,
+   * acceptStopped() will be called on these callbacks to notify them that
+   * accepting has stopped.  Accept callbacks being driven by other EventBase
+   * threads may continue to receive new accept callbacks for a brief period of
+   * time after destroy() returns.  They will not receive any more callback
+   * invocations once acceptStopped() is invoked.
+   */
+  virtual void destroy();
+
+  /**
+   * Attach this AsyncServerSocket to its primary EventBase.
+   *
+   * This may only be called if the AsyncServerSocket is not already attached
+   * to a EventBase.  The AsyncServerSocket must be attached to a EventBase
+   * before it can begin accepting connections.
+   */
+  void attachEventBase(EventBase *eventBase);
+
+  /**
+   * Detach the AsyncServerSocket from its primary EventBase.
+   *
+   * detachEventBase() may only be called if the AsyncServerSocket is not
+   * currently accepting connections.
+   */
+  void detachEventBase();
+
+  /**
+   * Get the EventBase used by this socket.
+   */
+  EventBase* getEventBase() const {
+    return eventBase_;
+  }
+
+  /**
+   * Create a AsyncServerSocket from an existing socket file descriptor.
+   *
+   * useExistingSocket() will cause the AsyncServerSocket to take ownership of
+   * the specified file descriptor, and use it to listen for new connections.
+   * The AsyncServerSocket will close the file descriptor when it is
+   * destroyed.
+   *
+   * useExistingSocket() must be called before bind() or listen().
+   *
+   * The supplied file descriptor will automatically be put into non-blocking
+   * mode.  The caller may have already directly called bind() and possibly
+   * listen on the file descriptor.  If so the caller should skip calling the
+   * corresponding AsyncServerSocket::bind() and listen() methods.
+   *
+   * On error a TTransportException will be thrown and the caller will retain
+   * ownership of the file descriptor.
+   */
+  void useExistingSocket(int fd);
+  void useExistingSockets(const std::vector<int>& fds);
+
+  /**
+   * Return the underlying file descriptor
+   */
+  std::vector<int> getSockets() const {
+    std::vector<int> sockets;
+    for (auto& handler : sockets_) {
+      sockets.push_back(handler.socket_);
+    }
+    return sockets;
+  }
+
+  /**
+   * Backwards compatible getSocket, warns if > 1 socket
+   */
+  int getSocket() const {
+    if (sockets_.size() > 1) {
+      VLOG(2) << "Warning: getSocket can return multiple fds, " <<
+        "but getSockets was not called, so only returning the first";
+    }
+    if (sockets_.size() == 0) {
+      return -1;
+    } else {
+      return sockets_[0].socket_;
+    }
+  }
+
+  /**
+   * Bind to the specified address.
+   *
+   * This must be called from the primary EventBase thread.
+   *
+   * Throws TTransportException on error.
+   */
+  virtual void bind(const SocketAddress& address);
+
+  /**
+   * Bind to the specified port.
+   *
+   * This must be called from the primary EventBase thread.
+   *
+   * Throws TTransportException on error.
+   */
+  virtual void bind(uint16_t port);
+
+  /**
+   * Get the local address to which the socket is bound.
+   *
+   * Throws TTransportException on error.
+   */
+  void getAddress(SocketAddress* addressReturn) const;
+
+  /**
+   * Get all the local addresses to which the socket is bound.
+   *
+   * Throws TTransportException on error.
+   */
+  std::vector<SocketAddress> getAddresses() const;
+
+  /**
+   * Begin listening for connections.
+   *
+   * This calls ::listen() with the specified backlog.
+   *
+   * Once listen() is invoked the socket will actually be open so that remote
+   * clients may establish connections.  (Clients that attempt to connect
+   * before listen() is called will receive a connection refused error.)
+   *
+   * At least one callback must be set and startAccepting() must be called to
+   * actually begin notifying the accept callbacks of newly accepted
+   * connections.  The backlog parameter controls how many connections the
+   * kernel will accept and buffer internally while the accept callbacks are
+   * paused (or if accepting is enabled but the callbacks cannot keep up).
+   *
+   * bind() must be called before calling listen().
+   * listen() must be called from the primary EventBase thread.
+   *
+   * Throws TTransportException on error.
+   */
+  virtual void listen(int backlog);
+
+  /**
+   * Add an AcceptCallback.
+   *
+   * When a new socket is accepted, one of the AcceptCallbacks will be invoked
+   * with the new socket.  The AcceptCallbacks are invoked in a round-robin
+   * fashion.  This allows the accepted sockets to distributed among a pool of
+   * threads, each running its own EventBase object.  This is a common model,
+   * since most asynchronous-style servers typically run one EventBase thread
+   * per CPU.
+   *
+   * The EventBase object associated with each AcceptCallback must be running
+   * its loop.  If the EventBase loop is not running, sockets will still be
+   * scheduled for the callback, but the callback cannot actually get invoked
+   * until the loop runs.
+   *
+   * This method must be invoked from the AsyncServerSocket's primary
+   * EventBase thread.
+   *
+   * Note that startAccepting() must be called on the AsyncServerSocket to
+   * cause it to actually start accepting sockets once callbacks have been
+   * installed.
+   *
+   * @param callback   The callback to invoke.
+   * @param eventBase  The EventBase to use to invoke the callback.  This
+   *     parameter may be nullptr, in which case the callback will be invoked in
+   *     the AsyncServerSocket's primary EventBase.
+   * @param maxAtOnce  The maximum number of connections to accept in this
+   *                   callback on a single iteration of the event base loop.
+   *                   This only takes effect when eventBase is non-nullptr.
+   *                   When using a nullptr eventBase for the callback, the
+   *                   setMaxAcceptAtOnce() method controls how many
+   *                   connections the main event base will accept at once.
+   */
+  virtual void addAcceptCallback(
+    AcceptCallback *callback,
+    EventBase *eventBase,
+    uint32_t maxAtOnce = kDefaultCallbackAcceptAtOnce);
+
+  /**
+   * Remove an AcceptCallback.
+   *
+   * This allows a single AcceptCallback to be removed from the round-robin
+   * pool.
+   *
+   * This method must be invoked from the AsyncServerSocket's primary
+   * EventBase thread.  Use EventBase::runInEventBaseThread() to schedule the
+   * operation in the correct EventBase if your code is not in the server
+   * socket's primary EventBase.
+   *
+   * Given that the accept callback is being driven by a different EventBase,
+   * the AcceptCallback may continue to be invoked for a short period of time
+   * after removeAcceptCallback() returns in this thread.  Once the other
+   * EventBase thread receives the notification to stop, it will call
+   * acceptStopped() on the callback to inform it that it is fully stopped and
+   * will not receive any new sockets.
+   *
+   * If the last accept callback is removed while the socket is accepting,
+   * the socket will implicitly pause accepting.  If a callback is later added,
+   * it will resume accepting immediately, without requiring startAccepting()
+   * to be invoked.
+   *
+   * @param callback   The callback to uninstall.
+   * @param eventBase  The EventBase associated with this callback.  This must
+   *     be the same EventBase that was used when the callback was installed
+   *     with addAcceptCallback().
+   */
+  void removeAcceptCallback(AcceptCallback *callback, EventBase *eventBase);
+
+  /**
+   * Begin accepting connctions on this socket.
+   *
+   * bind() and listen() must be called before calling startAccepting().
+   *
+   * When a AsyncServerSocket is initially created, it will not begin
+   * accepting connections until at least one callback has been added and
+   * startAccepting() has been called.  startAccepting() can also be used to
+   * resume accepting connections after a call to pauseAccepting().
+   *
+   * If startAccepting() is called when there are no accept callbacks
+   * installed, the socket will not actually begin accepting until an accept
+   * callback is added.
+   *
+   * This method may only be called from the primary EventBase thread.
+   */
+  virtual void startAccepting();
+
+  /**
+   * Pause accepting connections.
+   *
+   * startAccepting() may be called to resume accepting.
+   *
+   * This method may only be called from the primary EventBase thread.
+   * If there are AcceptCallbacks being driven by other EventBase threads they
+   * may continue to receive callbacks for a short period of time after
+   * pauseAccepting() returns.
+   *
+   * Unlike removeAcceptCallback() or destroy(), acceptStopped() will not be
+   * called on the AcceptCallback objects simply due to a temporary pause.  If
+   * the server socket is later destroyed while paused, acceptStopped() will be
+   * called all of the installed AcceptCallbacks.
+   */
+  void pauseAccepting();
+
+  /**
+   * Shutdown the listen socket and notify all callbacks that accept has
+   * stopped, but don't close the socket.  This invokes shutdown(2) with the
+   * supplied argument.  Passing -1 will close the socket now.  Otherwise, the
+   * close will be delayed until this object is destroyed.
+   *
+   * Only use this if you have reason to pass special flags to shutdown.
+   * Otherwise just destroy the socket.
+   *
+   * This method has no effect when a ShutdownSocketSet option is used.
+   *
+   * Returns the result of shutdown on sockets_[n-1]
+   */
+  int stopAccepting(int shutdownFlags = -1);
+
+  /**
+   * Get the maximum number of connections that will be accepted each time
+   * around the event loop.
+   */
+  uint32_t getMaxAcceptAtOnce() const {
+    return maxAcceptAtOnce_;
+  }
+
+  /**
+   * Set the maximum number of connections that will be accepted each time
+   * around the event loop.
+   *
+   * This provides a very coarse-grained way of controlling how fast the
+   * AsyncServerSocket will accept connections.  If you find that when your
+   * server is overloaded AsyncServerSocket accepts connections more quickly
+   * than your code can process them, you can try lowering this number so that
+   * fewer connections will be accepted each event loop iteration.
+   *
+   * For more explicit control over the accept rate, you can also use
+   * pauseAccepting() to temporarily pause accepting when your server is
+   * overloaded, and then use startAccepting() later to resume accepting.
+   */
+  void setMaxAcceptAtOnce(uint32_t numConns) {
+    maxAcceptAtOnce_ = numConns;
+  }
+
+  /**
+   * Get the maximum number of unprocessed messages which a NotificationQueue
+   * can hold.
+   */
+  uint32_t getMaxNumMessagesInQueue() const {
+    return maxNumMsgsInQueue_;
+  }
+
+  /**
+   * Set the maximum number of unprocessed messages in NotificationQueue.
+   * No new message will be sent to that NotificationQueue if there are more
+   * than such number of unprocessed messages in that queue.
+   *
+   * Only works if called before addAcceptCallback.
+   */
+  void setMaxNumMessagesInQueue(uint32_t num) {
+    maxNumMsgsInQueue_ = num;
+  }
+
+  /**
+   * Get the speed of adjusting connection accept rate.
+   */
+  double getAcceptRateAdjustSpeed() const {
+    return acceptRateAdjustSpeed_;
+  }
+
+  /**
+   * Set the speed of adjusting connection accept rate.
+   */
+  void setAcceptRateAdjustSpeed(double speed) {
+    acceptRateAdjustSpeed_ = speed;
+  }
+
+  /**
+   * Get the number of connections dropped by the AsyncServerSocket
+   */
+  uint64_t getNumDroppedConnections() const {
+    return numDroppedConnections_;
+  }
+
+  /**
+   * Set whether or not SO_KEEPALIVE should be enabled on the server socket
+   * (and thus on all subsequently-accepted connections). By default, keepalive
+   * is enabled.
+   *
+   * Note that TCP keepalive usually only kicks in after the connection has
+   * been idle for several hours. Applications should almost always have their
+   * own, shorter idle timeout.
+   */
+  void setKeepAliveEnabled(bool enabled) {
+    keepAliveEnabled_ = enabled;
+
+    for (auto& handler : sockets_) {
+      if (handler.socket_ < 0) {
+        continue;
+      }
+
+      int val = (enabled) ? 1 : 0;
+      if (setsockopt(handler.socket_, SOL_SOCKET,
+                     SO_KEEPALIVE, &val, sizeof(val)) != 0) {
+        LOG(ERROR) << "failed to set SO_KEEPALIVE on async server socket: %s" <<
+                strerror(errno);
+      }
+    }
+  }
+
+  /**
+   * Get whether or not SO_KEEPALIVE is enabled on the server socket.
+   */
+  bool getKeepAliveEnabled() const {
+    return keepAliveEnabled_;
+  }
+
+  /**
+   * Set whether or not the socket should close during exec() (FD_CLOEXEC). By
+   * default, this is enabled
+   */
+  void setCloseOnExec(bool closeOnExec) {
+    closeOnExec_ = closeOnExec;
+  }
+
+  /**
+   * Get whether or not FD_CLOEXEC is enabled on the server socket.
+   */
+  bool getCloseOnExec() const {
+    return closeOnExec_;
+  }
+
+ protected:
+  /**
+   * Protected destructor.
+   *
+   * Invoke destroy() instead to destroy the AsyncServerSocket.
+   */
+  virtual ~AsyncServerSocket();
+
+ private:
+  enum class MessageType {
+    MSG_NEW_CONN = 0,
+    MSG_ERROR = 1
+  };
+
+  struct QueueMessage {
+    MessageType type;
+    int fd;
+    int err;
+    SocketAddress address;
+    std::string msg;
+  };
+
+  /**
+   * A class to receive notifications to invoke AcceptCallback objects
+   * in other EventBase threads.
+   *
+   * A RemoteAcceptor object is created for each AcceptCallback that
+   * is installed in a separate EventBase thread.  The RemoteAcceptor
+   * receives notification of new sockets via a NotificationQueue,
+   * and then invokes the AcceptCallback.
+   */
+  class RemoteAcceptor
+      : private NotificationQueue<QueueMessage>::Consumer {
+  public:
+    explicit RemoteAcceptor(AcceptCallback *callback)
+      : callback_(callback) {}
+
+    ~RemoteAcceptor() {}
+
+    void start(EventBase *eventBase, uint32_t maxAtOnce, uint32_t maxInQueue);
+    void stop(EventBase* eventBase, AcceptCallback* callback);
+
+    virtual void messageAvailable(QueueMessage&& message);
+
+    NotificationQueue<QueueMessage>* getQueue() {
+      return &queue_;
+    }
+
+  private:
+    AcceptCallback *callback_;
+
+    NotificationQueue<QueueMessage> queue_;
+  };
+
+  /**
+   * A struct to keep track of the callbacks associated with this server
+   * socket.
+   */
+  struct CallbackInfo {
+    CallbackInfo(AcceptCallback *cb, EventBase *evb)
+      : callback(cb),
+        eventBase(evb),
+        consumer(nullptr) {}
+
+    AcceptCallback *callback;
+    EventBase *eventBase;
+
+    RemoteAcceptor* consumer;
+  };
+
+  class BackoffTimeout;
+
+  virtual void handlerReady(
+    uint16_t events, int socket, sa_family_t family) noexcept;
+
+  int createSocket(int family);
+  void setupSocket(int fd);
+  void dispatchSocket(int socket, SocketAddress&& address);
+  void dispatchError(const char *msg, int errnoValue);
+  void enterBackoff();
+  void backoffTimeoutExpired();
+
+  CallbackInfo* nextCallback() {
+    CallbackInfo* info = &callbacks_[callbackIndex_];
+
+    ++callbackIndex_;
+    if (callbackIndex_ >= callbacks_.size()) {
+      callbackIndex_ = 0;
+    }
+
+    return info;
+  }
+
+  struct ServerEventHandler : public EventHandler {
+    ServerEventHandler(EventBase* eventBase, int socket,
+                       AsyncServerSocket* parent,
+                      sa_family_t addressFamily)
+        : EventHandler(eventBase, socket)
+        , eventBase_(eventBase)
+        , socket_(socket)
+        , parent_(parent)
+        , addressFamily_(addressFamily) {}
+
+    ServerEventHandler(const ServerEventHandler& other)
+    : EventHandler(other.eventBase_, other.socket_)
+    , eventBase_(other.eventBase_)
+    , socket_(other.socket_)
+    , parent_(other.parent_)
+    , addressFamily_(other.addressFamily_) {}
+
+    ServerEventHandler& operator=(
+        const ServerEventHandler& other) {
+      if (this != &other) {
+        eventBase_ = other.eventBase_;
+        socket_ = other.socket_;
+        parent_ = other.parent_;
+        addressFamily_ = other.addressFamily_;
+
+        detachEventBase();
+        attachEventBase(other.eventBase_);
+        changeHandlerFD(other.socket_);
+      }
+      return *this;
+    }
+
+    // Inherited from EventHandler
+    virtual void handlerReady(uint16_t events) noexcept {
+      parent_->handlerReady(events, socket_, addressFamily_);
+    }
+
+    EventBase* eventBase_;
+    int socket_;
+    AsyncServerSocket* parent_;
+    sa_family_t addressFamily_;
+  };
+
+  EventBase *eventBase_;
+  std::vector<ServerEventHandler> sockets_;
+  std::vector<int> pendingCloseSockets_;
+  bool accepting_;
+  uint32_t maxAcceptAtOnce_;
+  uint32_t maxNumMsgsInQueue_;
+  double acceptRateAdjustSpeed_;  //0 to disable auto adjust
+  double acceptRate_;
+  std::chrono::time_point<std::chrono::steady_clock> lastAccepTimestamp_;
+  uint64_t numDroppedConnections_;
+  uint32_t callbackIndex_;
+  BackoffTimeout *backoffTimeout_;
+  std::vector<CallbackInfo> callbacks_;
+  bool keepAliveEnabled_;
+  bool closeOnExec_;
+  ShutdownSocketSet* shutdownSocketSet_;
+};
+
+} // folly