--- /dev/null
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __STDC_FORMAT_MACROS
+ #define __STDC_FORMAT_MACROS
+#endif
+
+#include <folly/io/async/AsyncServerSocket.h>
+
+#include <folly/io/async/EventBase.h>
+#include <folly/io/async/NotificationQueue.h>
+#include <folly/SocketAddress.h>
+
+#include <errno.h>
+#include <string.h>
+#include <unistd.h>
+#include <fcntl.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/tcp.h>
+
+namespace folly {
+
+const uint32_t AsyncServerSocket::kDefaultMaxAcceptAtOnce;
+const uint32_t AsyncServerSocket::kDefaultCallbackAcceptAtOnce;
+const uint32_t AsyncServerSocket::kDefaultMaxMessagesInQueue;
+
+int setCloseOnExec(int fd, int value) {
+ // Read the current flags
+ int old_flags = fcntl(fd, F_GETFD, 0);
+
+ // If reading the flags failed, return error indication now
+ if (old_flags < 0)
+ return -1;
+
+ // Set just the flag we want to set
+ int new_flags;
+ if (value != 0)
+ new_flags = old_flags | FD_CLOEXEC;
+ else
+ new_flags = old_flags & ~FD_CLOEXEC;
+
+ // Store modified flag word in the descriptor
+ return fcntl(fd, F_SETFD, new_flags);
+}
+
+void AsyncServerSocket::RemoteAcceptor::start(
+ EventBase* eventBase, uint32_t maxAtOnce, uint32_t maxInQueue) {
+ setMaxReadAtOnce(maxAtOnce);
+ queue_.setMaxQueueSize(maxInQueue);
+
+ if (!eventBase->runInEventBaseThread([=](){
+ callback_->acceptStarted();
+ this->startConsuming(eventBase, &queue_);
+ })) {
+ throw std::invalid_argument("unable to start waiting on accept "
+ "notification queue in the specified "
+ "EventBase thread");
+ }
+}
+
+void AsyncServerSocket::RemoteAcceptor::stop(
+ EventBase* eventBase, AcceptCallback* callback) {
+ if (!eventBase->runInEventBaseThread([=](){
+ callback->acceptStopped();
+ delete this;
+ })) {
+ throw std::invalid_argument("unable to start waiting on accept "
+ "notification queue in the specified "
+ "EventBase thread");
+ }
+}
+
+void AsyncServerSocket::RemoteAcceptor::messageAvailable(
+ QueueMessage&& msg) {
+
+ switch (msg.type) {
+ case MessageType::MSG_NEW_CONN:
+ {
+ callback_->connectionAccepted(msg.fd, msg.address);
+ break;
+ }
+ case MessageType::MSG_ERROR:
+ {
+ std::runtime_error ex(msg.msg);
+ callback_->acceptError(ex);
+ break;
+ }
+ default:
+ {
+ LOG(ERROR) << "invalid accept notification message type "
+ << int(msg.type);
+ std::runtime_error ex(
+ "received invalid accept notification message type");
+ callback_->acceptError(ex);
+ }
+ }
+}
+
+/*
+ * AsyncServerSocket::BackoffTimeout
+ */
+class AsyncServerSocket::BackoffTimeout : public AsyncTimeout {
+ public:
+ BackoffTimeout(AsyncServerSocket* socket)
+ : AsyncTimeout(socket->getEventBase()),
+ socket_(socket) {}
+
+ virtual void timeoutExpired() noexcept {
+ socket_->backoffTimeoutExpired();
+ }
+
+ private:
+ AsyncServerSocket* socket_;
+};
+
+/*
+ * AsyncServerSocket methods
+ */
+
+AsyncServerSocket::AsyncServerSocket(EventBase* eventBase)
+: eventBase_(eventBase),
+ accepting_(false),
+ maxAcceptAtOnce_(kDefaultMaxAcceptAtOnce),
+ maxNumMsgsInQueue_(kDefaultMaxMessagesInQueue),
+ acceptRateAdjustSpeed_(0),
+ acceptRate_(1),
+ lastAccepTimestamp_(std::chrono::steady_clock::now()),
+ numDroppedConnections_(0),
+ callbackIndex_(0),
+ backoffTimeout_(nullptr),
+ callbacks_(),
+ keepAliveEnabled_(true),
+ closeOnExec_(true),
+ shutdownSocketSet_(nullptr) {
+}
+
+void AsyncServerSocket::setShutdownSocketSet(ShutdownSocketSet* newSS) {
+ if (shutdownSocketSet_ == newSS) {
+ return;
+ }
+ if (shutdownSocketSet_) {
+ for (auto& h : sockets_) {
+ shutdownSocketSet_->remove(h.socket_);
+ }
+ }
+ shutdownSocketSet_ = newSS;
+ if (shutdownSocketSet_) {
+ for (auto& h : sockets_) {
+ shutdownSocketSet_->add(h.socket_);
+ }
+ }
+}
+
+AsyncServerSocket::~AsyncServerSocket() {
+ assert(callbacks_.empty());
+}
+
+int AsyncServerSocket::stopAccepting(int shutdownFlags) {
+ int result = 0;
+ for (auto& handler : sockets_) {
+ VLOG(10) << "AsyncServerSocket::stopAccepting " << this <<
+ handler.socket_;
+ }
+ assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
+
+ // When destroy is called, unregister and close the socket immediately
+ accepting_ = false;
+
+ for (auto& handler : sockets_) {
+ handler.unregisterHandler();
+ if (shutdownSocketSet_) {
+ shutdownSocketSet_->close(handler.socket_);
+ } else if (shutdownFlags >= 0) {
+ result = ::shutdown(handler.socket_, shutdownFlags);
+ pendingCloseSockets_.push_back(handler.socket_);
+ } else {
+ ::close(handler.socket_);
+ }
+ }
+ sockets_.clear();
+
+ // Destroy the backoff timout. This will cancel it if it is running.
+ delete backoffTimeout_;
+ backoffTimeout_ = nullptr;
+
+ // Close all of the callback queues to notify them that they are being
+ // destroyed. No one should access the AsyncServerSocket any more once
+ // destroy() is called. However, clear out callbacks_ before invoking the
+ // accept callbacks just in case. This will potentially help us detect the
+ // bug if one of the callbacks calls addAcceptCallback() or
+ // removeAcceptCallback().
+ std::vector<CallbackInfo> callbacksCopy;
+ callbacks_.swap(callbacksCopy);
+ for (std::vector<CallbackInfo>::iterator it = callbacksCopy.begin();
+ it != callbacksCopy.end();
+ ++it) {
+ it->consumer->stop(it->eventBase, it->callback);
+ }
+
+ return result;
+}
+
+void AsyncServerSocket::destroy() {
+ stopAccepting();
+ for (auto s: pendingCloseSockets_) {
+ ::close(s);
+ }
+ // Then call DelayedDestruction::destroy() to take care of
+ // whether or not we need immediate or delayed destruction
+ DelayedDestruction::destroy();
+}
+
+void AsyncServerSocket::attachEventBase(EventBase *eventBase) {
+ assert(eventBase_ == nullptr);
+ assert(eventBase->isInEventBaseThread());
+
+ eventBase_ = eventBase;
+ for (auto& handler : sockets_) {
+ handler.attachEventBase(eventBase);
+ }
+}
+
+void AsyncServerSocket::detachEventBase() {
+ assert(eventBase_ != nullptr);
+ assert(eventBase_->isInEventBaseThread());
+ assert(!accepting_);
+
+ eventBase_ = nullptr;
+ for (auto& handler : sockets_) {
+ handler.detachEventBase();
+ }
+}
+
+void AsyncServerSocket::useExistingSockets(const std::vector<int>& fds) {
+ assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
+
+ if (sockets_.size() > 0) {
+ throw std::invalid_argument(
+ "cannot call useExistingSocket() on a "
+ "AsyncServerSocket that already has a socket");
+ }
+
+ for (auto fd: fds) {
+ // Set addressFamily_ from this socket.
+ // Note that the socket may not have been bound yet, but
+ // setFromLocalAddress() will still work and get the correct address family.
+ // We will update addressFamily_ again anyway if bind() is called later.
+ SocketAddress address;
+ address.setFromLocalAddress(fd);
+
+ setupSocket(fd);
+ sockets_.push_back(
+ ServerEventHandler(eventBase_, fd, this, address.getFamily()));
+ sockets_.back().changeHandlerFD(fd);
+ }
+}
+
+void AsyncServerSocket::useExistingSocket(int fd) {
+ useExistingSockets({fd});
+}
+
+void AsyncServerSocket::bind(const SocketAddress& address) {
+ assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
+
+ // useExistingSocket() may have been called to initialize socket_ already.
+ // However, in the normal case we need to create a new socket now.
+ // Don't set socket_ yet, so that socket_ will remain uninitialized if an
+ // error occurs.
+ int fd;
+ if (sockets_.size() == 0) {
+ fd = createSocket(address.getFamily());
+ } else if (sockets_.size() == 1) {
+ if (address.getFamily() != sockets_[0].addressFamily_) {
+ throw std::invalid_argument(
+ "Attempted to bind address to socket with "
+ "different address family");
+ }
+ fd = sockets_[0].socket_;
+ } else {
+ throw std::invalid_argument(
+ "Attempted to bind to multiple fds");
+ }
+
+ // Bind to the socket
+ sockaddr_storage addrStorage;
+ address.getAddress(&addrStorage);
+ sockaddr* saddr = reinterpret_cast<sockaddr*>(&addrStorage);
+ if (::bind(fd, saddr, address.getActualSize()) != 0) {
+ if (sockets_.size() == 0) {
+ ::close(fd);
+ }
+ folly::throwSystemError(errno,
+ "failed to bind to async server socket: " +
+ address.describe());
+ }
+
+ // Record the address family that we are using,
+ // so we know how much address space we need to record accepted addresses.
+
+ // If we just created this socket, update the EventHandler and set socket_
+ if (sockets_.size() == 0) {
+ sockets_.push_back(
+ ServerEventHandler(eventBase_, fd, this, address.getFamily()));
+ sockets_[0].changeHandlerFD(fd);
+ }
+}
+
+void AsyncServerSocket::bind(uint16_t port) {
+ struct addrinfo hints, *res, *res0;
+ char sport[sizeof("65536")];
+
+ memset(&hints, 0, sizeof(hints));
+ hints.ai_family = AF_UNSPEC;
+ hints.ai_socktype = SOCK_STREAM;
+ hints.ai_flags = AI_PASSIVE;
+ snprintf(sport, sizeof(sport), "%u", port);
+
+ if (getaddrinfo(nullptr, sport, &hints, &res0)) {
+ throw std::invalid_argument(
+ "Attempted to bind address to socket with "
+ "bad getaddrinfo");
+ }
+
+ folly::ScopeGuard guard = folly::makeGuard([&]{
+ freeaddrinfo(res0);
+ });
+ DCHECK(&guard);
+
+ for (res = res0; res; res = res->ai_next) {
+ int s = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
+ // IPv6/IPv4 may not be supported by the kernel
+ if (s < 0 && errno == EAFNOSUPPORT) {
+ continue;
+ }
+ CHECK(s);
+
+ try {
+ setupSocket(s);
+ } catch (...) {
+ ::close(s);
+ throw;
+ }
+
+ if (res->ai_family == AF_INET6) {
+ int v6only = 1;
+ CHECK(0 == setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY,
+ &v6only, sizeof(v6only)));
+ }
+
+ SocketAddress address;
+ address.setFromLocalAddress(s);
+
+ sockets_.push_back(
+ ServerEventHandler(eventBase_, s, this, address.getFamily()));
+
+ // Bind to the socket
+ if (::bind(s, res->ai_addr, res->ai_addrlen) != 0) {
+ folly::throwSystemError(
+ errno,
+ "failed to bind to async server socket for port");
+ }
+ }
+ if (sockets_.size() == 0) {
+ throw std::runtime_error(
+ "did not bind any async server socket for port");
+ }
+}
+
+void AsyncServerSocket::listen(int backlog) {
+ assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
+
+ // Start listening
+ for (auto& handler : sockets_) {
+ if (::listen(handler.socket_, backlog) == -1) {
+ folly::throwSystemError(errno,
+ "failed to listen on async server socket");
+ }
+ }
+}
+
+void AsyncServerSocket::getAddress(SocketAddress* addressReturn) const {
+ CHECK(sockets_.size() >= 1);
+ if (sockets_.size() > 1) {
+ VLOG(2) << "Warning: getAddress can return multiple addresses, " <<
+ "but getAddress was called, so only returning the first";
+ }
+ addressReturn->setFromLocalAddress(sockets_[0].socket_);
+}
+
+std::vector<SocketAddress> AsyncServerSocket::getAddresses()
+ const {
+ CHECK(sockets_.size() >= 1);
+ auto tsaVec = std::vector<SocketAddress>(sockets_.size());
+ auto tsaIter = tsaVec.begin();
+ for (const auto& socket : sockets_) {
+ (tsaIter++)->setFromLocalAddress(socket.socket_);
+ };
+ return tsaVec;
+}
+
+void AsyncServerSocket::addAcceptCallback(AcceptCallback *callback,
+ EventBase *eventBase,
+ uint32_t maxAtOnce) {
+ assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
+
+ // If this is the first accept callback and we are supposed to be accepting,
+ // start accepting once the callback is installed.
+ bool runStartAccepting = accepting_ && callbacks_.empty();
+
+ if (!eventBase) {
+ eventBase = eventBase_; // Run in AsyncServerSocket's eventbase
+ }
+
+ callbacks_.push_back(CallbackInfo(callback, eventBase));
+
+ // Start the remote acceptor.
+ //
+ // It would be nice if we could avoid starting the remote acceptor if
+ // eventBase == eventBase_. However, that would cause issues if
+ // detachEventBase() and attachEventBase() were ever used to change the
+ // primary EventBase for the server socket. Therefore we require the caller
+ // to specify a nullptr EventBase if they want to ensure that the callback is
+ // always invoked in the primary EventBase, and to be able to invoke that
+ // callback more efficiently without having to use a notification queue.
+ RemoteAcceptor* acceptor = nullptr;
+ try {
+ acceptor = new RemoteAcceptor(callback);
+ acceptor->start(eventBase, maxAtOnce, maxNumMsgsInQueue_);
+ } catch (...) {
+ callbacks_.pop_back();
+ delete acceptor;
+ throw;
+ }
+ callbacks_.back().consumer = acceptor;
+
+ // If this is the first accept callback and we are supposed to be accepting,
+ // start accepting.
+ if (runStartAccepting) {
+ startAccepting();
+ }
+}
+
+void AsyncServerSocket::removeAcceptCallback(AcceptCallback *callback,
+ EventBase *eventBase) {
+ assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
+
+ // Find the matching AcceptCallback.
+ // We just do a simple linear search; we don't expect removeAcceptCallback()
+ // to be called frequently, and we expect there to only be a small number of
+ // callbacks anyway.
+ std::vector<CallbackInfo>::iterator it = callbacks_.begin();
+ uint32_t n = 0;
+ while (true) {
+ if (it == callbacks_.end()) {
+ throw std::runtime_error("AsyncServerSocket::removeAcceptCallback(): "
+ "accept callback not found");
+ }
+ if (it->callback == callback &&
+ (it->eventBase == eventBase || eventBase == nullptr)) {
+ break;
+ }
+ ++it;
+ ++n;
+ }
+
+ // Remove this callback from callbacks_.
+ //
+ // Do this before invoking the acceptStopped() callback, in case
+ // acceptStopped() invokes one of our methods that examines callbacks_.
+ //
+ // Save a copy of the CallbackInfo first.
+ CallbackInfo info(*it);
+ callbacks_.erase(it);
+ if (n < callbackIndex_) {
+ // We removed an element before callbackIndex_. Move callbackIndex_ back
+ // one step, since things after n have been shifted back by 1.
+ --callbackIndex_;
+ } else {
+ // We removed something at or after callbackIndex_.
+ // If we removed the last element and callbackIndex_ was pointing at it,
+ // we need to reset callbackIndex_ to 0.
+ if (callbackIndex_ >= callbacks_.size()) {
+ callbackIndex_ = 0;
+ }
+ }
+
+ info.consumer->stop(info.eventBase, info.callback);
+
+ // If we are supposed to be accepting but the last accept callback
+ // was removed, unregister for events until a callback is added.
+ if (accepting_ && callbacks_.empty()) {
+ for (auto& handler : sockets_) {
+ handler.unregisterHandler();
+ }
+ }
+}
+
+void AsyncServerSocket::startAccepting() {
+ assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
+
+ accepting_ = true;
+ if (callbacks_.empty()) {
+ // We can't actually begin accepting if no callbacks are defined.
+ // Wait until a callback is added to start accepting.
+ return;
+ }
+
+ for (auto& handler : sockets_) {
+ if (!handler.registerHandler(
+ EventHandler::READ | EventHandler::PERSIST)) {
+ throw std::runtime_error("failed to register for accept events");
+ }
+ }
+}
+
+void AsyncServerSocket::pauseAccepting() {
+ assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
+ accepting_ = false;
+ for (auto& handler : sockets_) {
+ handler. unregisterHandler();
+ }
+
+ // If we were in the accept backoff state, disable the backoff timeout
+ if (backoffTimeout_) {
+ backoffTimeout_->cancelTimeout();
+ }
+}
+
+int AsyncServerSocket::createSocket(int family) {
+ int fd = socket(family, SOCK_STREAM, 0);
+ if (fd == -1) {
+ folly::throwSystemError(errno, "error creating async server socket");
+ }
+
+ try {
+ setupSocket(fd);
+ } catch (...) {
+ ::close(fd);
+ throw;
+ }
+ return fd;
+}
+
+void AsyncServerSocket::setupSocket(int fd) {
+ // Get the address family
+ SocketAddress address;
+ address.setFromLocalAddress(fd);
+ auto family = address.getFamily();
+
+ // Put the socket in non-blocking mode
+ if (fcntl(fd, F_SETFL, O_NONBLOCK) != 0) {
+ folly::throwSystemError(errno,
+ "failed to put socket in non-blocking mode");
+ }
+
+ // Set reuseaddr to avoid 2MSL delay on server restart
+ int one = 1;
+ if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)) != 0) {
+ // This isn't a fatal error; just log an error message and continue
+ LOG(ERROR) << "failed to set SO_REUSEADDR on async server socket " << errno;
+ }
+
+ // Set keepalive as desired
+ int zero = 0;
+ if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE,
+ (keepAliveEnabled_) ? &one : &zero, sizeof(int)) != 0) {
+ LOG(ERROR) << "failed to set SO_KEEPALIVE on async server socket: " <<
+ strerror(errno);
+ }
+
+ // Setup FD_CLOEXEC flag
+ if (closeOnExec_ &&
+ (-1 == folly::setCloseOnExec(fd, closeOnExec_))) {
+ LOG(ERROR) << "failed to set FD_CLOEXEC on async server socket: " <<
+ strerror(errno);
+ }
+
+ // Set TCP nodelay if available, MAC OS X Hack
+ // See http://lists.danga.com/pipermail/memcached/2005-March/001240.html
+#ifndef TCP_NOPUSH
+ if (family != AF_UNIX) {
+ if (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one)) != 0) {
+ // This isn't a fatal error; just log an error message and continue
+ LOG(ERROR) << "failed to set TCP_NODELAY on async server socket: " <<
+ strerror(errno);
+ }
+ }
+#endif
+
+ if (shutdownSocketSet_) {
+ shutdownSocketSet_->add(fd);
+ }
+}
+
+void AsyncServerSocket::handlerReady(
+ uint16_t events, int fd, sa_family_t addressFamily) noexcept {
+ assert(!callbacks_.empty());
+ DestructorGuard dg(this);
+
+ // Only accept up to maxAcceptAtOnce_ connections at a time,
+ // to avoid starving other I/O handlers using this EventBase.
+ for (uint32_t n = 0; n < maxAcceptAtOnce_; ++n) {
+ SocketAddress address;
+
+ sockaddr_storage addrStorage;
+ socklen_t addrLen = sizeof(addrStorage);
+ sockaddr* saddr = reinterpret_cast<sockaddr*>(&addrStorage);
+
+ // In some cases, accept() doesn't seem to update these correctly.
+ saddr->sa_family = addressFamily;
+ if (addressFamily == AF_UNIX) {
+ addrLen = sizeof(struct sockaddr_un);
+ }
+
+ // Accept a new client socket
+#ifdef SOCK_NONBLOCK
+ int clientSocket = accept4(fd, saddr, &addrLen, SOCK_NONBLOCK);
+#else
+ int clientSocket = accept(fd, saddr, &addrLen);
+#endif
+
+ address.setFromSockaddr(saddr, addrLen);
+
+ std::chrono::time_point<std::chrono::steady_clock> nowMs =
+ std::chrono::steady_clock::now();
+ int64_t timeSinceLastAccept = std::max(
+ int64_t(0),
+ nowMs.time_since_epoch().count() -
+ lastAccepTimestamp_.time_since_epoch().count());
+ lastAccepTimestamp_ = nowMs;
+ if (acceptRate_ < 1) {
+ acceptRate_ *= 1 + acceptRateAdjustSpeed_ * timeSinceLastAccept;
+ if (acceptRate_ >= 1) {
+ acceptRate_ = 1;
+ } else if (rand() > acceptRate_ * RAND_MAX) {
+ ++numDroppedConnections_;
+ if (clientSocket >= 0) {
+ ::close(clientSocket);
+ }
+ continue;
+ }
+ }
+
+ if (clientSocket < 0) {
+ if (errno == EAGAIN) {
+ // No more sockets to accept right now.
+ // Check for this code first, since it's the most common.
+ return;
+ } else if (errno == EMFILE || errno == ENFILE) {
+ // We're out of file descriptors. Perhaps we're accepting connections
+ // too quickly. Pause accepting briefly to back off and give the server
+ // a chance to recover.
+ LOG(ERROR) << "accept failed: out of file descriptors; entering accept "
+ "back-off state";
+ enterBackoff();
+
+ // Dispatch the error message
+ dispatchError("accept() failed", errno);
+ } else {
+ dispatchError("accept() failed", errno);
+ }
+ return;
+ }
+
+#ifndef SOCK_NONBLOCK
+ // Explicitly set the new connection to non-blocking mode
+ if (fcntl(clientSocket, F_SETFL, O_NONBLOCK) != 0) {
+ ::close(clientSocket);
+ dispatchError("failed to set accepted socket to non-blocking mode",
+ errno);
+ return;
+ }
+#endif
+
+ // Inform the callback about the new connection
+ dispatchSocket(clientSocket, std::move(address));
+
+ // If we aren't accepting any more, break out of the loop
+ if (!accepting_ || callbacks_.empty()) {
+ break;
+ }
+ }
+}
+
+void AsyncServerSocket::dispatchSocket(int socket,
+ SocketAddress&& address) {
+ uint32_t startingIndex = callbackIndex_;
+
+ // Short circuit if the callback is in the primary EventBase thread
+
+ CallbackInfo *info = nextCallback();
+ if (info->eventBase == nullptr) {
+ info->callback->connectionAccepted(socket, address);
+ return;
+ }
+
+ // Create a message to send over the notification queue
+ QueueMessage msg;
+ msg.type = MessageType::MSG_NEW_CONN;
+ msg.address = std::move(address);
+ msg.fd = socket;
+
+ // Loop until we find a free queue to write to
+ while (true) {
+ if (info->consumer->getQueue()->tryPutMessageNoThrow(std::move(msg))) {
+ // Success! return.
+ return;
+ }
+
+ // We couldn't add to queue. Fall through to below
+
+ ++numDroppedConnections_;
+ if (acceptRateAdjustSpeed_ > 0) {
+ // aggressively decrease accept rate when in trouble
+ static const double kAcceptRateDecreaseSpeed = 0.1;
+ acceptRate_ *= 1 - kAcceptRateDecreaseSpeed;
+ }
+
+
+ if (callbackIndex_ == startingIndex) {
+ // The notification queue was full
+ // We can't really do anything at this point other than close the socket.
+ //
+ // This should only happen if a user's service is behaving extremely
+ // badly and none of the EventBase threads are looping fast enough to
+ // process the incoming connections. If the service is overloaded, it
+ // should use pauseAccepting() to temporarily back off accepting new
+ // connections, before they reach the point where their threads can't
+ // even accept new messages.
+ LOG(ERROR) << "failed to dispatch newly accepted socket:"
+ << " all accept callback queues are full";
+ ::close(socket);
+ return;
+ }
+
+ info = nextCallback();
+ }
+}
+
+void AsyncServerSocket::dispatchError(const char *msgstr, int errnoValue) {
+ uint32_t startingIndex = callbackIndex_;
+ CallbackInfo *info = nextCallback();
+
+ // Create a message to send over the notification queue
+ QueueMessage msg;
+ msg.type = MessageType::MSG_ERROR;
+ msg.err = errnoValue;
+ msg.msg = std::move(msgstr);
+
+ while (true) {
+ // Short circuit if the callback is in the primary EventBase thread
+ if (info->eventBase == nullptr) {
+ std::runtime_error ex(msgstr + errnoValue);
+ info->callback->acceptError(ex);
+ return;
+ }
+
+ if (info->consumer->getQueue()->tryPutMessageNoThrow(std::move(msg))) {
+ return;
+ }
+ // Fall through and try another callback
+
+ if (callbackIndex_ == startingIndex) {
+ // The notification queues for all of the callbacks were full.
+ // We can't really do anything at this point.
+ LOG(ERROR) << "failed to dispatch accept error: all accept callback "
+ "queues are full: error msg: " <<
+ msg.msg.c_str() << errnoValue;
+ return;
+ }
+ info = nextCallback();
+ }
+}
+
+void AsyncServerSocket::enterBackoff() {
+ // If this is the first time we have entered the backoff state,
+ // allocate backoffTimeout_.
+ if (backoffTimeout_ == nullptr) {
+ try {
+ backoffTimeout_ = new BackoffTimeout(this);
+ } catch (const std::bad_alloc& ex) {
+ // Man, we couldn't even allocate the timer to re-enable accepts.
+ // We must be in pretty bad shape. Don't pause accepting for now,
+ // since we won't be able to re-enable ourselves later.
+ LOG(ERROR) << "failed to allocate AsyncServerSocket backoff"
+ << " timer; unable to temporarly pause accepting";
+ return;
+ }
+ }
+
+ // For now, we simply pause accepting for 1 second.
+ //
+ // We could add some smarter backoff calculation here in the future. (e.g.,
+ // start sleeping for longer if we keep hitting the backoff frequently.)
+ // Typically the user needs to figure out why the server is overloaded and
+ // fix it in some other way, though. The backoff timer is just a simple
+ // mechanism to try and give the connection processing code a little bit of
+ // breathing room to catch up, and to avoid just spinning and failing to
+ // accept over and over again.
+ const uint32_t timeoutMS = 1000;
+ if (!backoffTimeout_->scheduleTimeout(timeoutMS)) {
+ LOG(ERROR) << "failed to schedule AsyncServerSocket backoff timer;"
+ << "unable to temporarly pause accepting";
+ return;
+ }
+
+ // The backoff timer is scheduled to re-enable accepts.
+ // Go ahead and disable accepts for now. We leave accepting_ set to true,
+ // since that tracks the desired state requested by the user.
+ for (auto& handler : sockets_) {
+ handler.unregisterHandler();
+ }
+}
+
+void AsyncServerSocket::backoffTimeoutExpired() {
+ // accepting_ should still be true.
+ // If pauseAccepting() was called while in the backoff state it will cancel
+ // the backoff timeout.
+ assert(accepting_);
+ // We can't be detached from the EventBase without being paused
+ assert(eventBase_ != nullptr && eventBase_->isInEventBaseThread());
+
+ // If all of the callbacks were removed, we shouldn't re-enable accepts
+ if (callbacks_.empty()) {
+ return;
+ }
+
+ // Register the handler.
+ for (auto& handler : sockets_) {
+ if (!handler.registerHandler(
+ EventHandler::READ | EventHandler::PERSIST)) {
+ // We're hosed. We could just re-schedule backoffTimeout_ to
+ // re-try again after a little bit. However, we don't want to
+ // loop retrying forever if we can't re-enable accepts. Just
+ // abort the entire program in this state; things are really bad
+ // and restarting the entire server is probably the best remedy.
+ LOG(ERROR)
+ << "failed to re-enable AsyncServerSocket accepts after backoff; "
+ << "crashing now";
+ abort();
+ }
+ }
+}
+
+
+
+} // folly
--- /dev/null
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <folly/io/async/DelayedDestruction.h>
+#include <folly/io/async/EventHandler.h>
+#include <folly/io/async/EventBase.h>
+#include <folly/io/async/NotificationQueue.h>
+#include <folly/io/async/AsyncTimeout.h>
+#include <folly/io/ShutdownSocketSet.h>
+#include <folly/SocketAddress.h>
+#include <memory>
+#include <exception>
+#include <vector>
+#include <limits.h>
+#include <stddef.h>
+#include <sys/socket.h>
+
+namespace folly {
+
+/**
+ * A listening socket that asynchronously informs a callback whenever a new
+ * connection has been accepted.
+ *
+ * Unlike most async interfaces that always invoke their callback in the same
+ * EventBase thread, AsyncServerSocket is unusual in that it can distribute
+ * the callbacks across multiple EventBase threads.
+ *
+ * This supports a common use case for network servers to distribute incoming
+ * connections across a number of EventBase threads. (Servers typically run
+ * with one EventBase thread per CPU.)
+ *
+ * Despite being able to invoke callbacks in multiple EventBase threads,
+ * AsyncServerSocket still has one "primary" EventBase. Operations that
+ * modify the AsyncServerSocket state may only be performed from the primary
+ * EventBase thread.
+ */
+class AsyncServerSocket : public DelayedDestruction {
+ public:
+ typedef std::unique_ptr<AsyncServerSocket, Destructor> UniquePtr;
+
+ class AcceptCallback {
+ public:
+ virtual ~AcceptCallback() {}
+
+ /**
+ * connectionAccepted() is called whenever a new client connection is
+ * received.
+ *
+ * The AcceptCallback will remain installed after connectionAccepted()
+ * returns.
+ *
+ * @param fd The newly accepted client socket. The AcceptCallback
+ * assumes ownership of this socket, and is responsible
+ * for closing it when done. The newly accepted file
+ * descriptor will have already been put into
+ * non-blocking mode.
+ * @param clientAddr A reference to a TSocketAddress struct containing the
+ * client's address. This struct is only guaranteed to
+ * remain valid until connectionAccepted() returns.
+ */
+ virtual void connectionAccepted(int fd,
+ const SocketAddress& clientAddr)
+ noexcept = 0;
+
+ /**
+ * acceptError() is called if an error occurs while accepting.
+ *
+ * The AcceptCallback will remain installed even after an accept error,
+ * as the errors are typically somewhat transient, such as being out of
+ * file descriptors. The server socket must be explicitly stopped if you
+ * wish to stop accepting after an error.
+ *
+ * @param ex An exception representing the error.
+ */
+ virtual void acceptError(const std::exception& ex) noexcept = 0;
+
+ /**
+ * acceptStarted() will be called in the callback's EventBase thread
+ * after this callback has been added to the AsyncServerSocket.
+ *
+ * acceptStarted() will be called before any calls to connectionAccepted()
+ * or acceptError() are made on this callback.
+ *
+ * acceptStarted() makes it easier for callbacks to perform initialization
+ * inside the callback thread. (The call to addAcceptCallback() must
+ * always be made from the AsyncServerSocket's primary EventBase thread.
+ * acceptStarted() provides a hook that will always be invoked in the
+ * callback's thread.)
+ *
+ * Note that the call to acceptStarted() is made once the callback is
+ * added, regardless of whether or not the AsyncServerSocket is actually
+ * accepting at the moment. acceptStarted() will be called even if the
+ * AsyncServerSocket is paused when the callback is added (including if
+ * the initial call to startAccepting() on the AsyncServerSocket has not
+ * been made yet).
+ */
+ virtual void acceptStarted() noexcept {}
+
+ /**
+ * acceptStopped() will be called when this AcceptCallback is removed from
+ * the AsyncServerSocket, or when the AsyncServerSocket is destroyed,
+ * whichever occurs first.
+ *
+ * No more calls to connectionAccepted() or acceptError() will be made
+ * after acceptStopped() is invoked.
+ */
+ virtual void acceptStopped() noexcept {}
+ };
+
+ static const uint32_t kDefaultMaxAcceptAtOnce = 30;
+ static const uint32_t kDefaultCallbackAcceptAtOnce = 5;
+ static const uint32_t kDefaultMaxMessagesInQueue = 0;
+ /**
+ * Create a new AsyncServerSocket with the specified EventBase.
+ *
+ * @param eventBase The EventBase to use for driving the asynchronous I/O.
+ * If this parameter is nullptr, attachEventBase() must be
+ * called before this socket can begin accepting
+ * connections.
+ */
+ explicit AsyncServerSocket(EventBase* eventBase = nullptr);
+
+ /**
+ * Helper function to create a shared_ptr<AsyncServerSocket>.
+ *
+ * This passes in the correct destructor object, since AsyncServerSocket's
+ * destructor is protected and cannot be invoked directly.
+ */
+ static std::shared_ptr<AsyncServerSocket>
+ newSocket(EventBase* evb = nullptr) {
+ return std::shared_ptr<AsyncServerSocket>(new AsyncServerSocket(evb),
+ Destructor());
+ }
+
+ void setShutdownSocketSet(ShutdownSocketSet* newSS);
+
+ /**
+ * Destroy the socket.
+ *
+ * AsyncServerSocket::destroy() must be called to destroy the socket.
+ * The normal destructor is private, and should not be invoked directly.
+ * This prevents callers from deleting a AsyncServerSocket while it is
+ * invoking a callback.
+ *
+ * destroy() must be invoked from the socket's primary EventBase thread.
+ *
+ * If there are AcceptCallbacks still installed when destroy() is called,
+ * acceptStopped() will be called on these callbacks to notify them that
+ * accepting has stopped. Accept callbacks being driven by other EventBase
+ * threads may continue to receive new accept callbacks for a brief period of
+ * time after destroy() returns. They will not receive any more callback
+ * invocations once acceptStopped() is invoked.
+ */
+ virtual void destroy();
+
+ /**
+ * Attach this AsyncServerSocket to its primary EventBase.
+ *
+ * This may only be called if the AsyncServerSocket is not already attached
+ * to a EventBase. The AsyncServerSocket must be attached to a EventBase
+ * before it can begin accepting connections.
+ */
+ void attachEventBase(EventBase *eventBase);
+
+ /**
+ * Detach the AsyncServerSocket from its primary EventBase.
+ *
+ * detachEventBase() may only be called if the AsyncServerSocket is not
+ * currently accepting connections.
+ */
+ void detachEventBase();
+
+ /**
+ * Get the EventBase used by this socket.
+ */
+ EventBase* getEventBase() const {
+ return eventBase_;
+ }
+
+ /**
+ * Create a AsyncServerSocket from an existing socket file descriptor.
+ *
+ * useExistingSocket() will cause the AsyncServerSocket to take ownership of
+ * the specified file descriptor, and use it to listen for new connections.
+ * The AsyncServerSocket will close the file descriptor when it is
+ * destroyed.
+ *
+ * useExistingSocket() must be called before bind() or listen().
+ *
+ * The supplied file descriptor will automatically be put into non-blocking
+ * mode. The caller may have already directly called bind() and possibly
+ * listen on the file descriptor. If so the caller should skip calling the
+ * corresponding AsyncServerSocket::bind() and listen() methods.
+ *
+ * On error a TTransportException will be thrown and the caller will retain
+ * ownership of the file descriptor.
+ */
+ void useExistingSocket(int fd);
+ void useExistingSockets(const std::vector<int>& fds);
+
+ /**
+ * Return the underlying file descriptor
+ */
+ std::vector<int> getSockets() const {
+ std::vector<int> sockets;
+ for (auto& handler : sockets_) {
+ sockets.push_back(handler.socket_);
+ }
+ return sockets;
+ }
+
+ /**
+ * Backwards compatible getSocket, warns if > 1 socket
+ */
+ int getSocket() const {
+ if (sockets_.size() > 1) {
+ VLOG(2) << "Warning: getSocket can return multiple fds, " <<
+ "but getSockets was not called, so only returning the first";
+ }
+ if (sockets_.size() == 0) {
+ return -1;
+ } else {
+ return sockets_[0].socket_;
+ }
+ }
+
+ /**
+ * Bind to the specified address.
+ *
+ * This must be called from the primary EventBase thread.
+ *
+ * Throws TTransportException on error.
+ */
+ virtual void bind(const SocketAddress& address);
+
+ /**
+ * Bind to the specified port.
+ *
+ * This must be called from the primary EventBase thread.
+ *
+ * Throws TTransportException on error.
+ */
+ virtual void bind(uint16_t port);
+
+ /**
+ * Get the local address to which the socket is bound.
+ *
+ * Throws TTransportException on error.
+ */
+ void getAddress(SocketAddress* addressReturn) const;
+
+ /**
+ * Get all the local addresses to which the socket is bound.
+ *
+ * Throws TTransportException on error.
+ */
+ std::vector<SocketAddress> getAddresses() const;
+
+ /**
+ * Begin listening for connections.
+ *
+ * This calls ::listen() with the specified backlog.
+ *
+ * Once listen() is invoked the socket will actually be open so that remote
+ * clients may establish connections. (Clients that attempt to connect
+ * before listen() is called will receive a connection refused error.)
+ *
+ * At least one callback must be set and startAccepting() must be called to
+ * actually begin notifying the accept callbacks of newly accepted
+ * connections. The backlog parameter controls how many connections the
+ * kernel will accept and buffer internally while the accept callbacks are
+ * paused (or if accepting is enabled but the callbacks cannot keep up).
+ *
+ * bind() must be called before calling listen().
+ * listen() must be called from the primary EventBase thread.
+ *
+ * Throws TTransportException on error.
+ */
+ virtual void listen(int backlog);
+
+ /**
+ * Add an AcceptCallback.
+ *
+ * When a new socket is accepted, one of the AcceptCallbacks will be invoked
+ * with the new socket. The AcceptCallbacks are invoked in a round-robin
+ * fashion. This allows the accepted sockets to distributed among a pool of
+ * threads, each running its own EventBase object. This is a common model,
+ * since most asynchronous-style servers typically run one EventBase thread
+ * per CPU.
+ *
+ * The EventBase object associated with each AcceptCallback must be running
+ * its loop. If the EventBase loop is not running, sockets will still be
+ * scheduled for the callback, but the callback cannot actually get invoked
+ * until the loop runs.
+ *
+ * This method must be invoked from the AsyncServerSocket's primary
+ * EventBase thread.
+ *
+ * Note that startAccepting() must be called on the AsyncServerSocket to
+ * cause it to actually start accepting sockets once callbacks have been
+ * installed.
+ *
+ * @param callback The callback to invoke.
+ * @param eventBase The EventBase to use to invoke the callback. This
+ * parameter may be nullptr, in which case the callback will be invoked in
+ * the AsyncServerSocket's primary EventBase.
+ * @param maxAtOnce The maximum number of connections to accept in this
+ * callback on a single iteration of the event base loop.
+ * This only takes effect when eventBase is non-nullptr.
+ * When using a nullptr eventBase for the callback, the
+ * setMaxAcceptAtOnce() method controls how many
+ * connections the main event base will accept at once.
+ */
+ virtual void addAcceptCallback(
+ AcceptCallback *callback,
+ EventBase *eventBase,
+ uint32_t maxAtOnce = kDefaultCallbackAcceptAtOnce);
+
+ /**
+ * Remove an AcceptCallback.
+ *
+ * This allows a single AcceptCallback to be removed from the round-robin
+ * pool.
+ *
+ * This method must be invoked from the AsyncServerSocket's primary
+ * EventBase thread. Use EventBase::runInEventBaseThread() to schedule the
+ * operation in the correct EventBase if your code is not in the server
+ * socket's primary EventBase.
+ *
+ * Given that the accept callback is being driven by a different EventBase,
+ * the AcceptCallback may continue to be invoked for a short period of time
+ * after removeAcceptCallback() returns in this thread. Once the other
+ * EventBase thread receives the notification to stop, it will call
+ * acceptStopped() on the callback to inform it that it is fully stopped and
+ * will not receive any new sockets.
+ *
+ * If the last accept callback is removed while the socket is accepting,
+ * the socket will implicitly pause accepting. If a callback is later added,
+ * it will resume accepting immediately, without requiring startAccepting()
+ * to be invoked.
+ *
+ * @param callback The callback to uninstall.
+ * @param eventBase The EventBase associated with this callback. This must
+ * be the same EventBase that was used when the callback was installed
+ * with addAcceptCallback().
+ */
+ void removeAcceptCallback(AcceptCallback *callback, EventBase *eventBase);
+
+ /**
+ * Begin accepting connctions on this socket.
+ *
+ * bind() and listen() must be called before calling startAccepting().
+ *
+ * When a AsyncServerSocket is initially created, it will not begin
+ * accepting connections until at least one callback has been added and
+ * startAccepting() has been called. startAccepting() can also be used to
+ * resume accepting connections after a call to pauseAccepting().
+ *
+ * If startAccepting() is called when there are no accept callbacks
+ * installed, the socket will not actually begin accepting until an accept
+ * callback is added.
+ *
+ * This method may only be called from the primary EventBase thread.
+ */
+ virtual void startAccepting();
+
+ /**
+ * Pause accepting connections.
+ *
+ * startAccepting() may be called to resume accepting.
+ *
+ * This method may only be called from the primary EventBase thread.
+ * If there are AcceptCallbacks being driven by other EventBase threads they
+ * may continue to receive callbacks for a short period of time after
+ * pauseAccepting() returns.
+ *
+ * Unlike removeAcceptCallback() or destroy(), acceptStopped() will not be
+ * called on the AcceptCallback objects simply due to a temporary pause. If
+ * the server socket is later destroyed while paused, acceptStopped() will be
+ * called all of the installed AcceptCallbacks.
+ */
+ void pauseAccepting();
+
+ /**
+ * Shutdown the listen socket and notify all callbacks that accept has
+ * stopped, but don't close the socket. This invokes shutdown(2) with the
+ * supplied argument. Passing -1 will close the socket now. Otherwise, the
+ * close will be delayed until this object is destroyed.
+ *
+ * Only use this if you have reason to pass special flags to shutdown.
+ * Otherwise just destroy the socket.
+ *
+ * This method has no effect when a ShutdownSocketSet option is used.
+ *
+ * Returns the result of shutdown on sockets_[n-1]
+ */
+ int stopAccepting(int shutdownFlags = -1);
+
+ /**
+ * Get the maximum number of connections that will be accepted each time
+ * around the event loop.
+ */
+ uint32_t getMaxAcceptAtOnce() const {
+ return maxAcceptAtOnce_;
+ }
+
+ /**
+ * Set the maximum number of connections that will be accepted each time
+ * around the event loop.
+ *
+ * This provides a very coarse-grained way of controlling how fast the
+ * AsyncServerSocket will accept connections. If you find that when your
+ * server is overloaded AsyncServerSocket accepts connections more quickly
+ * than your code can process them, you can try lowering this number so that
+ * fewer connections will be accepted each event loop iteration.
+ *
+ * For more explicit control over the accept rate, you can also use
+ * pauseAccepting() to temporarily pause accepting when your server is
+ * overloaded, and then use startAccepting() later to resume accepting.
+ */
+ void setMaxAcceptAtOnce(uint32_t numConns) {
+ maxAcceptAtOnce_ = numConns;
+ }
+
+ /**
+ * Get the maximum number of unprocessed messages which a NotificationQueue
+ * can hold.
+ */
+ uint32_t getMaxNumMessagesInQueue() const {
+ return maxNumMsgsInQueue_;
+ }
+
+ /**
+ * Set the maximum number of unprocessed messages in NotificationQueue.
+ * No new message will be sent to that NotificationQueue if there are more
+ * than such number of unprocessed messages in that queue.
+ *
+ * Only works if called before addAcceptCallback.
+ */
+ void setMaxNumMessagesInQueue(uint32_t num) {
+ maxNumMsgsInQueue_ = num;
+ }
+
+ /**
+ * Get the speed of adjusting connection accept rate.
+ */
+ double getAcceptRateAdjustSpeed() const {
+ return acceptRateAdjustSpeed_;
+ }
+
+ /**
+ * Set the speed of adjusting connection accept rate.
+ */
+ void setAcceptRateAdjustSpeed(double speed) {
+ acceptRateAdjustSpeed_ = speed;
+ }
+
+ /**
+ * Get the number of connections dropped by the AsyncServerSocket
+ */
+ uint64_t getNumDroppedConnections() const {
+ return numDroppedConnections_;
+ }
+
+ /**
+ * Set whether or not SO_KEEPALIVE should be enabled on the server socket
+ * (and thus on all subsequently-accepted connections). By default, keepalive
+ * is enabled.
+ *
+ * Note that TCP keepalive usually only kicks in after the connection has
+ * been idle for several hours. Applications should almost always have their
+ * own, shorter idle timeout.
+ */
+ void setKeepAliveEnabled(bool enabled) {
+ keepAliveEnabled_ = enabled;
+
+ for (auto& handler : sockets_) {
+ if (handler.socket_ < 0) {
+ continue;
+ }
+
+ int val = (enabled) ? 1 : 0;
+ if (setsockopt(handler.socket_, SOL_SOCKET,
+ SO_KEEPALIVE, &val, sizeof(val)) != 0) {
+ LOG(ERROR) << "failed to set SO_KEEPALIVE on async server socket: %s" <<
+ strerror(errno);
+ }
+ }
+ }
+
+ /**
+ * Get whether or not SO_KEEPALIVE is enabled on the server socket.
+ */
+ bool getKeepAliveEnabled() const {
+ return keepAliveEnabled_;
+ }
+
+ /**
+ * Set whether or not the socket should close during exec() (FD_CLOEXEC). By
+ * default, this is enabled
+ */
+ void setCloseOnExec(bool closeOnExec) {
+ closeOnExec_ = closeOnExec;
+ }
+
+ /**
+ * Get whether or not FD_CLOEXEC is enabled on the server socket.
+ */
+ bool getCloseOnExec() const {
+ return closeOnExec_;
+ }
+
+ protected:
+ /**
+ * Protected destructor.
+ *
+ * Invoke destroy() instead to destroy the AsyncServerSocket.
+ */
+ virtual ~AsyncServerSocket();
+
+ private:
+ enum class MessageType {
+ MSG_NEW_CONN = 0,
+ MSG_ERROR = 1
+ };
+
+ struct QueueMessage {
+ MessageType type;
+ int fd;
+ int err;
+ SocketAddress address;
+ std::string msg;
+ };
+
+ /**
+ * A class to receive notifications to invoke AcceptCallback objects
+ * in other EventBase threads.
+ *
+ * A RemoteAcceptor object is created for each AcceptCallback that
+ * is installed in a separate EventBase thread. The RemoteAcceptor
+ * receives notification of new sockets via a NotificationQueue,
+ * and then invokes the AcceptCallback.
+ */
+ class RemoteAcceptor
+ : private NotificationQueue<QueueMessage>::Consumer {
+ public:
+ explicit RemoteAcceptor(AcceptCallback *callback)
+ : callback_(callback) {}
+
+ ~RemoteAcceptor() {}
+
+ void start(EventBase *eventBase, uint32_t maxAtOnce, uint32_t maxInQueue);
+ void stop(EventBase* eventBase, AcceptCallback* callback);
+
+ virtual void messageAvailable(QueueMessage&& message);
+
+ NotificationQueue<QueueMessage>* getQueue() {
+ return &queue_;
+ }
+
+ private:
+ AcceptCallback *callback_;
+
+ NotificationQueue<QueueMessage> queue_;
+ };
+
+ /**
+ * A struct to keep track of the callbacks associated with this server
+ * socket.
+ */
+ struct CallbackInfo {
+ CallbackInfo(AcceptCallback *cb, EventBase *evb)
+ : callback(cb),
+ eventBase(evb),
+ consumer(nullptr) {}
+
+ AcceptCallback *callback;
+ EventBase *eventBase;
+
+ RemoteAcceptor* consumer;
+ };
+
+ class BackoffTimeout;
+
+ virtual void handlerReady(
+ uint16_t events, int socket, sa_family_t family) noexcept;
+
+ int createSocket(int family);
+ void setupSocket(int fd);
+ void dispatchSocket(int socket, SocketAddress&& address);
+ void dispatchError(const char *msg, int errnoValue);
+ void enterBackoff();
+ void backoffTimeoutExpired();
+
+ CallbackInfo* nextCallback() {
+ CallbackInfo* info = &callbacks_[callbackIndex_];
+
+ ++callbackIndex_;
+ if (callbackIndex_ >= callbacks_.size()) {
+ callbackIndex_ = 0;
+ }
+
+ return info;
+ }
+
+ struct ServerEventHandler : public EventHandler {
+ ServerEventHandler(EventBase* eventBase, int socket,
+ AsyncServerSocket* parent,
+ sa_family_t addressFamily)
+ : EventHandler(eventBase, socket)
+ , eventBase_(eventBase)
+ , socket_(socket)
+ , parent_(parent)
+ , addressFamily_(addressFamily) {}
+
+ ServerEventHandler(const ServerEventHandler& other)
+ : EventHandler(other.eventBase_, other.socket_)
+ , eventBase_(other.eventBase_)
+ , socket_(other.socket_)
+ , parent_(other.parent_)
+ , addressFamily_(other.addressFamily_) {}
+
+ ServerEventHandler& operator=(
+ const ServerEventHandler& other) {
+ if (this != &other) {
+ eventBase_ = other.eventBase_;
+ socket_ = other.socket_;
+ parent_ = other.parent_;
+ addressFamily_ = other.addressFamily_;
+
+ detachEventBase();
+ attachEventBase(other.eventBase_);
+ changeHandlerFD(other.socket_);
+ }
+ return *this;
+ }
+
+ // Inherited from EventHandler
+ virtual void handlerReady(uint16_t events) noexcept {
+ parent_->handlerReady(events, socket_, addressFamily_);
+ }
+
+ EventBase* eventBase_;
+ int socket_;
+ AsyncServerSocket* parent_;
+ sa_family_t addressFamily_;
+ };
+
+ EventBase *eventBase_;
+ std::vector<ServerEventHandler> sockets_;
+ std::vector<int> pendingCloseSockets_;
+ bool accepting_;
+ uint32_t maxAcceptAtOnce_;
+ uint32_t maxNumMsgsInQueue_;
+ double acceptRateAdjustSpeed_; //0 to disable auto adjust
+ double acceptRate_;
+ std::chrono::time_point<std::chrono::steady_clock> lastAccepTimestamp_;
+ uint64_t numDroppedConnections_;
+ uint32_t callbackIndex_;
+ BackoffTimeout *backoffTimeout_;
+ std::vector<CallbackInfo> callbacks_;
+ bool keepAliveEnabled_;
+ bool closeOnExec_;
+ ShutdownSocketSet* shutdownSocketSet_;
+};
+
+} // folly