From a44155cb1e81ff144d95b7cbf754f29a66818430 Mon Sep 17 00:00:00 2001 From: Dave Watson Date: Thu, 25 Sep 2014 10:17:20 -0700 Subject: [PATCH] move asyncserversocket to folly Summary: Changes: * namespace to folly * some std::chrono replacesments * Moves shutdownSocketSet stuff to thriftserver instead of in the socket itself. * Changes exception types. I don't see anywhere that uses the TAsync*SERVER*socket exceptions depending on the TTransport type, but I could be wrong? I don't really know what to do about the exception types unittests still postponed overnight. Test Plan: fbconfig -r thrift; fbmake runtests fbconfig -r proxygen; fbmake runtests Reviewed By: dcsommer@fb.com Subscribers: hphp-diffs@, ps, folly-diffs@, anca, trunkagent, jsedgwick, fugalh, doug, alandau, bmatheny, njormrod FB internal diff: D1579187 --- folly/Makefile.am | 2 + folly/io/async/AsyncServerSocket.cpp | 861 +++++++++++++++++++++++++++ folly/io/async/AsyncServerSocket.h | 682 +++++++++++++++++++++ 3 files changed, 1545 insertions(+) create mode 100644 folly/io/async/AsyncServerSocket.cpp create mode 100644 folly/io/async/AsyncServerSocket.h diff --git a/folly/Makefile.am b/folly/Makefile.am index 240824d5..f1ca4ac5 100644 --- a/folly/Makefile.am +++ b/folly/Makefile.am @@ -130,6 +130,7 @@ nobase_follyinclude_HEADERS = \ io/TypedIOBuf.h \ io/ShutdownSocketSet.h \ io/async/AsyncTimeout.h \ + io/async/AsyncServerSocket.h \ io/async/DelayedDestruction.h \ io/async/EventBase.h \ io/async/EventBaseManager.h \ @@ -256,6 +257,7 @@ libfolly_la_SOURCES = \ io/RecordIO.cpp \ io/ShutdownSocketSet.cpp \ io/async/AsyncTimeout.cpp \ + io/async/AsyncServerSocket.cpp \ io/async/EventBase.cpp \ io/async/EventBaseManager.cpp \ io/async/EventHandler.cpp \ diff --git a/folly/io/async/AsyncServerSocket.cpp b/folly/io/async/AsyncServerSocket.cpp new file mode 100644 index 00000000..7dce10c1 --- /dev/null +++ b/folly/io/async/AsyncServerSocket.cpp @@ -0,0 +1,861 @@ +/* + * Copyright 2014 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef __STDC_FORMAT_MACROS + #define __STDC_FORMAT_MACROS +#endif + +#include + +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +namespace folly { + +const uint32_t AsyncServerSocket::kDefaultMaxAcceptAtOnce; +const uint32_t AsyncServerSocket::kDefaultCallbackAcceptAtOnce; +const uint32_t AsyncServerSocket::kDefaultMaxMessagesInQueue; + +int setCloseOnExec(int fd, int value) { + // Read the current flags + int old_flags = fcntl(fd, F_GETFD, 0); + + // If reading the flags failed, return error indication now + if (old_flags < 0) + return -1; + + // Set just the flag we want to set + int new_flags; + if (value != 0) + new_flags = old_flags | FD_CLOEXEC; + else + new_flags = old_flags & ~FD_CLOEXEC; + + // Store modified flag word in the descriptor + return fcntl(fd, F_SETFD, new_flags); +} + +void AsyncServerSocket::RemoteAcceptor::start( + EventBase* eventBase, uint32_t maxAtOnce, uint32_t maxInQueue) { + setMaxReadAtOnce(maxAtOnce); + queue_.setMaxQueueSize(maxInQueue); + + if (!eventBase->runInEventBaseThread([=](){ + callback_->acceptStarted(); + this->startConsuming(eventBase, &queue_); + })) { + throw std::invalid_argument("unable to start waiting on accept " + "notification queue in the specified " + "EventBase thread"); + } +} + +void AsyncServerSocket::RemoteAcceptor::stop( + EventBase* eventBase, AcceptCallback* callback) { + if (!eventBase->runInEventBaseThread([=](){ + callback->acceptStopped(); + delete this; + })) { + throw std::invalid_argument("unable to start waiting on accept " + "notification queue in the specified " + "EventBase thread"); + } +} + +void AsyncServerSocket::RemoteAcceptor::messageAvailable( + QueueMessage&& msg) { + + switch (msg.type) { + case MessageType::MSG_NEW_CONN: + { + callback_->connectionAccepted(msg.fd, msg.address); + break; + } + case MessageType::MSG_ERROR: + { + std::runtime_error ex(msg.msg); + callback_->acceptError(ex); + break; + } + default: + { + LOG(ERROR) << "invalid accept notification message type " + << int(msg.type); + std::runtime_error ex( + "received invalid accept notification message type"); + callback_->acceptError(ex); + } + } +} + +/* + * AsyncServerSocket::BackoffTimeout + */ +class AsyncServerSocket::BackoffTimeout : public AsyncTimeout { + public: + BackoffTimeout(AsyncServerSocket* socket) + : AsyncTimeout(socket->getEventBase()), + socket_(socket) {} + + virtual void timeoutExpired() noexcept { + socket_->backoffTimeoutExpired(); + } + + private: + AsyncServerSocket* socket_; +}; + +/* + * AsyncServerSocket methods + */ + +AsyncServerSocket::AsyncServerSocket(EventBase* eventBase) +: eventBase_(eventBase), + accepting_(false), + maxAcceptAtOnce_(kDefaultMaxAcceptAtOnce), + maxNumMsgsInQueue_(kDefaultMaxMessagesInQueue), + acceptRateAdjustSpeed_(0), + acceptRate_(1), + lastAccepTimestamp_(std::chrono::steady_clock::now()), + numDroppedConnections_(0), + callbackIndex_(0), + backoffTimeout_(nullptr), + callbacks_(), + keepAliveEnabled_(true), + closeOnExec_(true), + shutdownSocketSet_(nullptr) { +} + +void AsyncServerSocket::setShutdownSocketSet(ShutdownSocketSet* newSS) { + if (shutdownSocketSet_ == newSS) { + return; + } + if (shutdownSocketSet_) { + for (auto& h : sockets_) { + shutdownSocketSet_->remove(h.socket_); + } + } + shutdownSocketSet_ = newSS; + if (shutdownSocketSet_) { + for (auto& h : sockets_) { + shutdownSocketSet_->add(h.socket_); + } + } +} + +AsyncServerSocket::~AsyncServerSocket() { + assert(callbacks_.empty()); +} + +int AsyncServerSocket::stopAccepting(int shutdownFlags) { + int result = 0; + for (auto& handler : sockets_) { + VLOG(10) << "AsyncServerSocket::stopAccepting " << this << + handler.socket_; + } + assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread()); + + // When destroy is called, unregister and close the socket immediately + accepting_ = false; + + for (auto& handler : sockets_) { + handler.unregisterHandler(); + if (shutdownSocketSet_) { + shutdownSocketSet_->close(handler.socket_); + } else if (shutdownFlags >= 0) { + result = ::shutdown(handler.socket_, shutdownFlags); + pendingCloseSockets_.push_back(handler.socket_); + } else { + ::close(handler.socket_); + } + } + sockets_.clear(); + + // Destroy the backoff timout. This will cancel it if it is running. + delete backoffTimeout_; + backoffTimeout_ = nullptr; + + // Close all of the callback queues to notify them that they are being + // destroyed. No one should access the AsyncServerSocket any more once + // destroy() is called. However, clear out callbacks_ before invoking the + // accept callbacks just in case. This will potentially help us detect the + // bug if one of the callbacks calls addAcceptCallback() or + // removeAcceptCallback(). + std::vector callbacksCopy; + callbacks_.swap(callbacksCopy); + for (std::vector::iterator it = callbacksCopy.begin(); + it != callbacksCopy.end(); + ++it) { + it->consumer->stop(it->eventBase, it->callback); + } + + return result; +} + +void AsyncServerSocket::destroy() { + stopAccepting(); + for (auto s: pendingCloseSockets_) { + ::close(s); + } + // Then call DelayedDestruction::destroy() to take care of + // whether or not we need immediate or delayed destruction + DelayedDestruction::destroy(); +} + +void AsyncServerSocket::attachEventBase(EventBase *eventBase) { + assert(eventBase_ == nullptr); + assert(eventBase->isInEventBaseThread()); + + eventBase_ = eventBase; + for (auto& handler : sockets_) { + handler.attachEventBase(eventBase); + } +} + +void AsyncServerSocket::detachEventBase() { + assert(eventBase_ != nullptr); + assert(eventBase_->isInEventBaseThread()); + assert(!accepting_); + + eventBase_ = nullptr; + for (auto& handler : sockets_) { + handler.detachEventBase(); + } +} + +void AsyncServerSocket::useExistingSockets(const std::vector& fds) { + assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread()); + + if (sockets_.size() > 0) { + throw std::invalid_argument( + "cannot call useExistingSocket() on a " + "AsyncServerSocket that already has a socket"); + } + + for (auto fd: fds) { + // Set addressFamily_ from this socket. + // Note that the socket may not have been bound yet, but + // setFromLocalAddress() will still work and get the correct address family. + // We will update addressFamily_ again anyway if bind() is called later. + SocketAddress address; + address.setFromLocalAddress(fd); + + setupSocket(fd); + sockets_.push_back( + ServerEventHandler(eventBase_, fd, this, address.getFamily())); + sockets_.back().changeHandlerFD(fd); + } +} + +void AsyncServerSocket::useExistingSocket(int fd) { + useExistingSockets({fd}); +} + +void AsyncServerSocket::bind(const SocketAddress& address) { + assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread()); + + // useExistingSocket() may have been called to initialize socket_ already. + // However, in the normal case we need to create a new socket now. + // Don't set socket_ yet, so that socket_ will remain uninitialized if an + // error occurs. + int fd; + if (sockets_.size() == 0) { + fd = createSocket(address.getFamily()); + } else if (sockets_.size() == 1) { + if (address.getFamily() != sockets_[0].addressFamily_) { + throw std::invalid_argument( + "Attempted to bind address to socket with " + "different address family"); + } + fd = sockets_[0].socket_; + } else { + throw std::invalid_argument( + "Attempted to bind to multiple fds"); + } + + // Bind to the socket + sockaddr_storage addrStorage; + address.getAddress(&addrStorage); + sockaddr* saddr = reinterpret_cast(&addrStorage); + if (::bind(fd, saddr, address.getActualSize()) != 0) { + if (sockets_.size() == 0) { + ::close(fd); + } + folly::throwSystemError(errno, + "failed to bind to async server socket: " + + address.describe()); + } + + // Record the address family that we are using, + // so we know how much address space we need to record accepted addresses. + + // If we just created this socket, update the EventHandler and set socket_ + if (sockets_.size() == 0) { + sockets_.push_back( + ServerEventHandler(eventBase_, fd, this, address.getFamily())); + sockets_[0].changeHandlerFD(fd); + } +} + +void AsyncServerSocket::bind(uint16_t port) { + struct addrinfo hints, *res, *res0; + char sport[sizeof("65536")]; + + memset(&hints, 0, sizeof(hints)); + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + hints.ai_flags = AI_PASSIVE; + snprintf(sport, sizeof(sport), "%u", port); + + if (getaddrinfo(nullptr, sport, &hints, &res0)) { + throw std::invalid_argument( + "Attempted to bind address to socket with " + "bad getaddrinfo"); + } + + folly::ScopeGuard guard = folly::makeGuard([&]{ + freeaddrinfo(res0); + }); + DCHECK(&guard); + + for (res = res0; res; res = res->ai_next) { + int s = socket(res->ai_family, res->ai_socktype, res->ai_protocol); + // IPv6/IPv4 may not be supported by the kernel + if (s < 0 && errno == EAFNOSUPPORT) { + continue; + } + CHECK(s); + + try { + setupSocket(s); + } catch (...) { + ::close(s); + throw; + } + + if (res->ai_family == AF_INET6) { + int v6only = 1; + CHECK(0 == setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, + &v6only, sizeof(v6only))); + } + + SocketAddress address; + address.setFromLocalAddress(s); + + sockets_.push_back( + ServerEventHandler(eventBase_, s, this, address.getFamily())); + + // Bind to the socket + if (::bind(s, res->ai_addr, res->ai_addrlen) != 0) { + folly::throwSystemError( + errno, + "failed to bind to async server socket for port"); + } + } + if (sockets_.size() == 0) { + throw std::runtime_error( + "did not bind any async server socket for port"); + } +} + +void AsyncServerSocket::listen(int backlog) { + assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread()); + + // Start listening + for (auto& handler : sockets_) { + if (::listen(handler.socket_, backlog) == -1) { + folly::throwSystemError(errno, + "failed to listen on async server socket"); + } + } +} + +void AsyncServerSocket::getAddress(SocketAddress* addressReturn) const { + CHECK(sockets_.size() >= 1); + if (sockets_.size() > 1) { + VLOG(2) << "Warning: getAddress can return multiple addresses, " << + "but getAddress was called, so only returning the first"; + } + addressReturn->setFromLocalAddress(sockets_[0].socket_); +} + +std::vector AsyncServerSocket::getAddresses() + const { + CHECK(sockets_.size() >= 1); + auto tsaVec = std::vector(sockets_.size()); + auto tsaIter = tsaVec.begin(); + for (const auto& socket : sockets_) { + (tsaIter++)->setFromLocalAddress(socket.socket_); + }; + return tsaVec; +} + +void AsyncServerSocket::addAcceptCallback(AcceptCallback *callback, + EventBase *eventBase, + uint32_t maxAtOnce) { + assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread()); + + // If this is the first accept callback and we are supposed to be accepting, + // start accepting once the callback is installed. + bool runStartAccepting = accepting_ && callbacks_.empty(); + + if (!eventBase) { + eventBase = eventBase_; // Run in AsyncServerSocket's eventbase + } + + callbacks_.push_back(CallbackInfo(callback, eventBase)); + + // Start the remote acceptor. + // + // It would be nice if we could avoid starting the remote acceptor if + // eventBase == eventBase_. However, that would cause issues if + // detachEventBase() and attachEventBase() were ever used to change the + // primary EventBase for the server socket. Therefore we require the caller + // to specify a nullptr EventBase if they want to ensure that the callback is + // always invoked in the primary EventBase, and to be able to invoke that + // callback more efficiently without having to use a notification queue. + RemoteAcceptor* acceptor = nullptr; + try { + acceptor = new RemoteAcceptor(callback); + acceptor->start(eventBase, maxAtOnce, maxNumMsgsInQueue_); + } catch (...) { + callbacks_.pop_back(); + delete acceptor; + throw; + } + callbacks_.back().consumer = acceptor; + + // If this is the first accept callback and we are supposed to be accepting, + // start accepting. + if (runStartAccepting) { + startAccepting(); + } +} + +void AsyncServerSocket::removeAcceptCallback(AcceptCallback *callback, + EventBase *eventBase) { + assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread()); + + // Find the matching AcceptCallback. + // We just do a simple linear search; we don't expect removeAcceptCallback() + // to be called frequently, and we expect there to only be a small number of + // callbacks anyway. + std::vector::iterator it = callbacks_.begin(); + uint32_t n = 0; + while (true) { + if (it == callbacks_.end()) { + throw std::runtime_error("AsyncServerSocket::removeAcceptCallback(): " + "accept callback not found"); + } + if (it->callback == callback && + (it->eventBase == eventBase || eventBase == nullptr)) { + break; + } + ++it; + ++n; + } + + // Remove this callback from callbacks_. + // + // Do this before invoking the acceptStopped() callback, in case + // acceptStopped() invokes one of our methods that examines callbacks_. + // + // Save a copy of the CallbackInfo first. + CallbackInfo info(*it); + callbacks_.erase(it); + if (n < callbackIndex_) { + // We removed an element before callbackIndex_. Move callbackIndex_ back + // one step, since things after n have been shifted back by 1. + --callbackIndex_; + } else { + // We removed something at or after callbackIndex_. + // If we removed the last element and callbackIndex_ was pointing at it, + // we need to reset callbackIndex_ to 0. + if (callbackIndex_ >= callbacks_.size()) { + callbackIndex_ = 0; + } + } + + info.consumer->stop(info.eventBase, info.callback); + + // If we are supposed to be accepting but the last accept callback + // was removed, unregister for events until a callback is added. + if (accepting_ && callbacks_.empty()) { + for (auto& handler : sockets_) { + handler.unregisterHandler(); + } + } +} + +void AsyncServerSocket::startAccepting() { + assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread()); + + accepting_ = true; + if (callbacks_.empty()) { + // We can't actually begin accepting if no callbacks are defined. + // Wait until a callback is added to start accepting. + return; + } + + for (auto& handler : sockets_) { + if (!handler.registerHandler( + EventHandler::READ | EventHandler::PERSIST)) { + throw std::runtime_error("failed to register for accept events"); + } + } +} + +void AsyncServerSocket::pauseAccepting() { + assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread()); + accepting_ = false; + for (auto& handler : sockets_) { + handler. unregisterHandler(); + } + + // If we were in the accept backoff state, disable the backoff timeout + if (backoffTimeout_) { + backoffTimeout_->cancelTimeout(); + } +} + +int AsyncServerSocket::createSocket(int family) { + int fd = socket(family, SOCK_STREAM, 0); + if (fd == -1) { + folly::throwSystemError(errno, "error creating async server socket"); + } + + try { + setupSocket(fd); + } catch (...) { + ::close(fd); + throw; + } + return fd; +} + +void AsyncServerSocket::setupSocket(int fd) { + // Get the address family + SocketAddress address; + address.setFromLocalAddress(fd); + auto family = address.getFamily(); + + // Put the socket in non-blocking mode + if (fcntl(fd, F_SETFL, O_NONBLOCK) != 0) { + folly::throwSystemError(errno, + "failed to put socket in non-blocking mode"); + } + + // Set reuseaddr to avoid 2MSL delay on server restart + int one = 1; + if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)) != 0) { + // This isn't a fatal error; just log an error message and continue + LOG(ERROR) << "failed to set SO_REUSEADDR on async server socket " << errno; + } + + // Set keepalive as desired + int zero = 0; + if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, + (keepAliveEnabled_) ? &one : &zero, sizeof(int)) != 0) { + LOG(ERROR) << "failed to set SO_KEEPALIVE on async server socket: " << + strerror(errno); + } + + // Setup FD_CLOEXEC flag + if (closeOnExec_ && + (-1 == folly::setCloseOnExec(fd, closeOnExec_))) { + LOG(ERROR) << "failed to set FD_CLOEXEC on async server socket: " << + strerror(errno); + } + + // Set TCP nodelay if available, MAC OS X Hack + // See http://lists.danga.com/pipermail/memcached/2005-March/001240.html +#ifndef TCP_NOPUSH + if (family != AF_UNIX) { + if (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one)) != 0) { + // This isn't a fatal error; just log an error message and continue + LOG(ERROR) << "failed to set TCP_NODELAY on async server socket: " << + strerror(errno); + } + } +#endif + + if (shutdownSocketSet_) { + shutdownSocketSet_->add(fd); + } +} + +void AsyncServerSocket::handlerReady( + uint16_t events, int fd, sa_family_t addressFamily) noexcept { + assert(!callbacks_.empty()); + DestructorGuard dg(this); + + // Only accept up to maxAcceptAtOnce_ connections at a time, + // to avoid starving other I/O handlers using this EventBase. + for (uint32_t n = 0; n < maxAcceptAtOnce_; ++n) { + SocketAddress address; + + sockaddr_storage addrStorage; + socklen_t addrLen = sizeof(addrStorage); + sockaddr* saddr = reinterpret_cast(&addrStorage); + + // In some cases, accept() doesn't seem to update these correctly. + saddr->sa_family = addressFamily; + if (addressFamily == AF_UNIX) { + addrLen = sizeof(struct sockaddr_un); + } + + // Accept a new client socket +#ifdef SOCK_NONBLOCK + int clientSocket = accept4(fd, saddr, &addrLen, SOCK_NONBLOCK); +#else + int clientSocket = accept(fd, saddr, &addrLen); +#endif + + address.setFromSockaddr(saddr, addrLen); + + std::chrono::time_point nowMs = + std::chrono::steady_clock::now(); + int64_t timeSinceLastAccept = std::max( + int64_t(0), + nowMs.time_since_epoch().count() - + lastAccepTimestamp_.time_since_epoch().count()); + lastAccepTimestamp_ = nowMs; + if (acceptRate_ < 1) { + acceptRate_ *= 1 + acceptRateAdjustSpeed_ * timeSinceLastAccept; + if (acceptRate_ >= 1) { + acceptRate_ = 1; + } else if (rand() > acceptRate_ * RAND_MAX) { + ++numDroppedConnections_; + if (clientSocket >= 0) { + ::close(clientSocket); + } + continue; + } + } + + if (clientSocket < 0) { + if (errno == EAGAIN) { + // No more sockets to accept right now. + // Check for this code first, since it's the most common. + return; + } else if (errno == EMFILE || errno == ENFILE) { + // We're out of file descriptors. Perhaps we're accepting connections + // too quickly. Pause accepting briefly to back off and give the server + // a chance to recover. + LOG(ERROR) << "accept failed: out of file descriptors; entering accept " + "back-off state"; + enterBackoff(); + + // Dispatch the error message + dispatchError("accept() failed", errno); + } else { + dispatchError("accept() failed", errno); + } + return; + } + +#ifndef SOCK_NONBLOCK + // Explicitly set the new connection to non-blocking mode + if (fcntl(clientSocket, F_SETFL, O_NONBLOCK) != 0) { + ::close(clientSocket); + dispatchError("failed to set accepted socket to non-blocking mode", + errno); + return; + } +#endif + + // Inform the callback about the new connection + dispatchSocket(clientSocket, std::move(address)); + + // If we aren't accepting any more, break out of the loop + if (!accepting_ || callbacks_.empty()) { + break; + } + } +} + +void AsyncServerSocket::dispatchSocket(int socket, + SocketAddress&& address) { + uint32_t startingIndex = callbackIndex_; + + // Short circuit if the callback is in the primary EventBase thread + + CallbackInfo *info = nextCallback(); + if (info->eventBase == nullptr) { + info->callback->connectionAccepted(socket, address); + return; + } + + // Create a message to send over the notification queue + QueueMessage msg; + msg.type = MessageType::MSG_NEW_CONN; + msg.address = std::move(address); + msg.fd = socket; + + // Loop until we find a free queue to write to + while (true) { + if (info->consumer->getQueue()->tryPutMessageNoThrow(std::move(msg))) { + // Success! return. + return; + } + + // We couldn't add to queue. Fall through to below + + ++numDroppedConnections_; + if (acceptRateAdjustSpeed_ > 0) { + // aggressively decrease accept rate when in trouble + static const double kAcceptRateDecreaseSpeed = 0.1; + acceptRate_ *= 1 - kAcceptRateDecreaseSpeed; + } + + + if (callbackIndex_ == startingIndex) { + // The notification queue was full + // We can't really do anything at this point other than close the socket. + // + // This should only happen if a user's service is behaving extremely + // badly and none of the EventBase threads are looping fast enough to + // process the incoming connections. If the service is overloaded, it + // should use pauseAccepting() to temporarily back off accepting new + // connections, before they reach the point where their threads can't + // even accept new messages. + LOG(ERROR) << "failed to dispatch newly accepted socket:" + << " all accept callback queues are full"; + ::close(socket); + return; + } + + info = nextCallback(); + } +} + +void AsyncServerSocket::dispatchError(const char *msgstr, int errnoValue) { + uint32_t startingIndex = callbackIndex_; + CallbackInfo *info = nextCallback(); + + // Create a message to send over the notification queue + QueueMessage msg; + msg.type = MessageType::MSG_ERROR; + msg.err = errnoValue; + msg.msg = std::move(msgstr); + + while (true) { + // Short circuit if the callback is in the primary EventBase thread + if (info->eventBase == nullptr) { + std::runtime_error ex(msgstr + errnoValue); + info->callback->acceptError(ex); + return; + } + + if (info->consumer->getQueue()->tryPutMessageNoThrow(std::move(msg))) { + return; + } + // Fall through and try another callback + + if (callbackIndex_ == startingIndex) { + // The notification queues for all of the callbacks were full. + // We can't really do anything at this point. + LOG(ERROR) << "failed to dispatch accept error: all accept callback " + "queues are full: error msg: " << + msg.msg.c_str() << errnoValue; + return; + } + info = nextCallback(); + } +} + +void AsyncServerSocket::enterBackoff() { + // If this is the first time we have entered the backoff state, + // allocate backoffTimeout_. + if (backoffTimeout_ == nullptr) { + try { + backoffTimeout_ = new BackoffTimeout(this); + } catch (const std::bad_alloc& ex) { + // Man, we couldn't even allocate the timer to re-enable accepts. + // We must be in pretty bad shape. Don't pause accepting for now, + // since we won't be able to re-enable ourselves later. + LOG(ERROR) << "failed to allocate AsyncServerSocket backoff" + << " timer; unable to temporarly pause accepting"; + return; + } + } + + // For now, we simply pause accepting for 1 second. + // + // We could add some smarter backoff calculation here in the future. (e.g., + // start sleeping for longer if we keep hitting the backoff frequently.) + // Typically the user needs to figure out why the server is overloaded and + // fix it in some other way, though. The backoff timer is just a simple + // mechanism to try and give the connection processing code a little bit of + // breathing room to catch up, and to avoid just spinning and failing to + // accept over and over again. + const uint32_t timeoutMS = 1000; + if (!backoffTimeout_->scheduleTimeout(timeoutMS)) { + LOG(ERROR) << "failed to schedule AsyncServerSocket backoff timer;" + << "unable to temporarly pause accepting"; + return; + } + + // The backoff timer is scheduled to re-enable accepts. + // Go ahead and disable accepts for now. We leave accepting_ set to true, + // since that tracks the desired state requested by the user. + for (auto& handler : sockets_) { + handler.unregisterHandler(); + } +} + +void AsyncServerSocket::backoffTimeoutExpired() { + // accepting_ should still be true. + // If pauseAccepting() was called while in the backoff state it will cancel + // the backoff timeout. + assert(accepting_); + // We can't be detached from the EventBase without being paused + assert(eventBase_ != nullptr && eventBase_->isInEventBaseThread()); + + // If all of the callbacks were removed, we shouldn't re-enable accepts + if (callbacks_.empty()) { + return; + } + + // Register the handler. + for (auto& handler : sockets_) { + if (!handler.registerHandler( + EventHandler::READ | EventHandler::PERSIST)) { + // We're hosed. We could just re-schedule backoffTimeout_ to + // re-try again after a little bit. However, we don't want to + // loop retrying forever if we can't re-enable accepts. Just + // abort the entire program in this state; things are really bad + // and restarting the entire server is probably the best remedy. + LOG(ERROR) + << "failed to re-enable AsyncServerSocket accepts after backoff; " + << "crashing now"; + abort(); + } + } +} + + + +} // folly diff --git a/folly/io/async/AsyncServerSocket.h b/folly/io/async/AsyncServerSocket.h new file mode 100644 index 00000000..449b5a4a --- /dev/null +++ b/folly/io/async/AsyncServerSocket.h @@ -0,0 +1,682 @@ +/* + * Copyright 2014 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace folly { + +/** + * A listening socket that asynchronously informs a callback whenever a new + * connection has been accepted. + * + * Unlike most async interfaces that always invoke their callback in the same + * EventBase thread, AsyncServerSocket is unusual in that it can distribute + * the callbacks across multiple EventBase threads. + * + * This supports a common use case for network servers to distribute incoming + * connections across a number of EventBase threads. (Servers typically run + * with one EventBase thread per CPU.) + * + * Despite being able to invoke callbacks in multiple EventBase threads, + * AsyncServerSocket still has one "primary" EventBase. Operations that + * modify the AsyncServerSocket state may only be performed from the primary + * EventBase thread. + */ +class AsyncServerSocket : public DelayedDestruction { + public: + typedef std::unique_ptr UniquePtr; + + class AcceptCallback { + public: + virtual ~AcceptCallback() {} + + /** + * connectionAccepted() is called whenever a new client connection is + * received. + * + * The AcceptCallback will remain installed after connectionAccepted() + * returns. + * + * @param fd The newly accepted client socket. The AcceptCallback + * assumes ownership of this socket, and is responsible + * for closing it when done. The newly accepted file + * descriptor will have already been put into + * non-blocking mode. + * @param clientAddr A reference to a TSocketAddress struct containing the + * client's address. This struct is only guaranteed to + * remain valid until connectionAccepted() returns. + */ + virtual void connectionAccepted(int fd, + const SocketAddress& clientAddr) + noexcept = 0; + + /** + * acceptError() is called if an error occurs while accepting. + * + * The AcceptCallback will remain installed even after an accept error, + * as the errors are typically somewhat transient, such as being out of + * file descriptors. The server socket must be explicitly stopped if you + * wish to stop accepting after an error. + * + * @param ex An exception representing the error. + */ + virtual void acceptError(const std::exception& ex) noexcept = 0; + + /** + * acceptStarted() will be called in the callback's EventBase thread + * after this callback has been added to the AsyncServerSocket. + * + * acceptStarted() will be called before any calls to connectionAccepted() + * or acceptError() are made on this callback. + * + * acceptStarted() makes it easier for callbacks to perform initialization + * inside the callback thread. (The call to addAcceptCallback() must + * always be made from the AsyncServerSocket's primary EventBase thread. + * acceptStarted() provides a hook that will always be invoked in the + * callback's thread.) + * + * Note that the call to acceptStarted() is made once the callback is + * added, regardless of whether or not the AsyncServerSocket is actually + * accepting at the moment. acceptStarted() will be called even if the + * AsyncServerSocket is paused when the callback is added (including if + * the initial call to startAccepting() on the AsyncServerSocket has not + * been made yet). + */ + virtual void acceptStarted() noexcept {} + + /** + * acceptStopped() will be called when this AcceptCallback is removed from + * the AsyncServerSocket, or when the AsyncServerSocket is destroyed, + * whichever occurs first. + * + * No more calls to connectionAccepted() or acceptError() will be made + * after acceptStopped() is invoked. + */ + virtual void acceptStopped() noexcept {} + }; + + static const uint32_t kDefaultMaxAcceptAtOnce = 30; + static const uint32_t kDefaultCallbackAcceptAtOnce = 5; + static const uint32_t kDefaultMaxMessagesInQueue = 0; + /** + * Create a new AsyncServerSocket with the specified EventBase. + * + * @param eventBase The EventBase to use for driving the asynchronous I/O. + * If this parameter is nullptr, attachEventBase() must be + * called before this socket can begin accepting + * connections. + */ + explicit AsyncServerSocket(EventBase* eventBase = nullptr); + + /** + * Helper function to create a shared_ptr. + * + * This passes in the correct destructor object, since AsyncServerSocket's + * destructor is protected and cannot be invoked directly. + */ + static std::shared_ptr + newSocket(EventBase* evb = nullptr) { + return std::shared_ptr(new AsyncServerSocket(evb), + Destructor()); + } + + void setShutdownSocketSet(ShutdownSocketSet* newSS); + + /** + * Destroy the socket. + * + * AsyncServerSocket::destroy() must be called to destroy the socket. + * The normal destructor is private, and should not be invoked directly. + * This prevents callers from deleting a AsyncServerSocket while it is + * invoking a callback. + * + * destroy() must be invoked from the socket's primary EventBase thread. + * + * If there are AcceptCallbacks still installed when destroy() is called, + * acceptStopped() will be called on these callbacks to notify them that + * accepting has stopped. Accept callbacks being driven by other EventBase + * threads may continue to receive new accept callbacks for a brief period of + * time after destroy() returns. They will not receive any more callback + * invocations once acceptStopped() is invoked. + */ + virtual void destroy(); + + /** + * Attach this AsyncServerSocket to its primary EventBase. + * + * This may only be called if the AsyncServerSocket is not already attached + * to a EventBase. The AsyncServerSocket must be attached to a EventBase + * before it can begin accepting connections. + */ + void attachEventBase(EventBase *eventBase); + + /** + * Detach the AsyncServerSocket from its primary EventBase. + * + * detachEventBase() may only be called if the AsyncServerSocket is not + * currently accepting connections. + */ + void detachEventBase(); + + /** + * Get the EventBase used by this socket. + */ + EventBase* getEventBase() const { + return eventBase_; + } + + /** + * Create a AsyncServerSocket from an existing socket file descriptor. + * + * useExistingSocket() will cause the AsyncServerSocket to take ownership of + * the specified file descriptor, and use it to listen for new connections. + * The AsyncServerSocket will close the file descriptor when it is + * destroyed. + * + * useExistingSocket() must be called before bind() or listen(). + * + * The supplied file descriptor will automatically be put into non-blocking + * mode. The caller may have already directly called bind() and possibly + * listen on the file descriptor. If so the caller should skip calling the + * corresponding AsyncServerSocket::bind() and listen() methods. + * + * On error a TTransportException will be thrown and the caller will retain + * ownership of the file descriptor. + */ + void useExistingSocket(int fd); + void useExistingSockets(const std::vector& fds); + + /** + * Return the underlying file descriptor + */ + std::vector getSockets() const { + std::vector sockets; + for (auto& handler : sockets_) { + sockets.push_back(handler.socket_); + } + return sockets; + } + + /** + * Backwards compatible getSocket, warns if > 1 socket + */ + int getSocket() const { + if (sockets_.size() > 1) { + VLOG(2) << "Warning: getSocket can return multiple fds, " << + "but getSockets was not called, so only returning the first"; + } + if (sockets_.size() == 0) { + return -1; + } else { + return sockets_[0].socket_; + } + } + + /** + * Bind to the specified address. + * + * This must be called from the primary EventBase thread. + * + * Throws TTransportException on error. + */ + virtual void bind(const SocketAddress& address); + + /** + * Bind to the specified port. + * + * This must be called from the primary EventBase thread. + * + * Throws TTransportException on error. + */ + virtual void bind(uint16_t port); + + /** + * Get the local address to which the socket is bound. + * + * Throws TTransportException on error. + */ + void getAddress(SocketAddress* addressReturn) const; + + /** + * Get all the local addresses to which the socket is bound. + * + * Throws TTransportException on error. + */ + std::vector getAddresses() const; + + /** + * Begin listening for connections. + * + * This calls ::listen() with the specified backlog. + * + * Once listen() is invoked the socket will actually be open so that remote + * clients may establish connections. (Clients that attempt to connect + * before listen() is called will receive a connection refused error.) + * + * At least one callback must be set and startAccepting() must be called to + * actually begin notifying the accept callbacks of newly accepted + * connections. The backlog parameter controls how many connections the + * kernel will accept and buffer internally while the accept callbacks are + * paused (or if accepting is enabled but the callbacks cannot keep up). + * + * bind() must be called before calling listen(). + * listen() must be called from the primary EventBase thread. + * + * Throws TTransportException on error. + */ + virtual void listen(int backlog); + + /** + * Add an AcceptCallback. + * + * When a new socket is accepted, one of the AcceptCallbacks will be invoked + * with the new socket. The AcceptCallbacks are invoked in a round-robin + * fashion. This allows the accepted sockets to distributed among a pool of + * threads, each running its own EventBase object. This is a common model, + * since most asynchronous-style servers typically run one EventBase thread + * per CPU. + * + * The EventBase object associated with each AcceptCallback must be running + * its loop. If the EventBase loop is not running, sockets will still be + * scheduled for the callback, but the callback cannot actually get invoked + * until the loop runs. + * + * This method must be invoked from the AsyncServerSocket's primary + * EventBase thread. + * + * Note that startAccepting() must be called on the AsyncServerSocket to + * cause it to actually start accepting sockets once callbacks have been + * installed. + * + * @param callback The callback to invoke. + * @param eventBase The EventBase to use to invoke the callback. This + * parameter may be nullptr, in which case the callback will be invoked in + * the AsyncServerSocket's primary EventBase. + * @param maxAtOnce The maximum number of connections to accept in this + * callback on a single iteration of the event base loop. + * This only takes effect when eventBase is non-nullptr. + * When using a nullptr eventBase for the callback, the + * setMaxAcceptAtOnce() method controls how many + * connections the main event base will accept at once. + */ + virtual void addAcceptCallback( + AcceptCallback *callback, + EventBase *eventBase, + uint32_t maxAtOnce = kDefaultCallbackAcceptAtOnce); + + /** + * Remove an AcceptCallback. + * + * This allows a single AcceptCallback to be removed from the round-robin + * pool. + * + * This method must be invoked from the AsyncServerSocket's primary + * EventBase thread. Use EventBase::runInEventBaseThread() to schedule the + * operation in the correct EventBase if your code is not in the server + * socket's primary EventBase. + * + * Given that the accept callback is being driven by a different EventBase, + * the AcceptCallback may continue to be invoked for a short period of time + * after removeAcceptCallback() returns in this thread. Once the other + * EventBase thread receives the notification to stop, it will call + * acceptStopped() on the callback to inform it that it is fully stopped and + * will not receive any new sockets. + * + * If the last accept callback is removed while the socket is accepting, + * the socket will implicitly pause accepting. If a callback is later added, + * it will resume accepting immediately, without requiring startAccepting() + * to be invoked. + * + * @param callback The callback to uninstall. + * @param eventBase The EventBase associated with this callback. This must + * be the same EventBase that was used when the callback was installed + * with addAcceptCallback(). + */ + void removeAcceptCallback(AcceptCallback *callback, EventBase *eventBase); + + /** + * Begin accepting connctions on this socket. + * + * bind() and listen() must be called before calling startAccepting(). + * + * When a AsyncServerSocket is initially created, it will not begin + * accepting connections until at least one callback has been added and + * startAccepting() has been called. startAccepting() can also be used to + * resume accepting connections after a call to pauseAccepting(). + * + * If startAccepting() is called when there are no accept callbacks + * installed, the socket will not actually begin accepting until an accept + * callback is added. + * + * This method may only be called from the primary EventBase thread. + */ + virtual void startAccepting(); + + /** + * Pause accepting connections. + * + * startAccepting() may be called to resume accepting. + * + * This method may only be called from the primary EventBase thread. + * If there are AcceptCallbacks being driven by other EventBase threads they + * may continue to receive callbacks for a short period of time after + * pauseAccepting() returns. + * + * Unlike removeAcceptCallback() or destroy(), acceptStopped() will not be + * called on the AcceptCallback objects simply due to a temporary pause. If + * the server socket is later destroyed while paused, acceptStopped() will be + * called all of the installed AcceptCallbacks. + */ + void pauseAccepting(); + + /** + * Shutdown the listen socket and notify all callbacks that accept has + * stopped, but don't close the socket. This invokes shutdown(2) with the + * supplied argument. Passing -1 will close the socket now. Otherwise, the + * close will be delayed until this object is destroyed. + * + * Only use this if you have reason to pass special flags to shutdown. + * Otherwise just destroy the socket. + * + * This method has no effect when a ShutdownSocketSet option is used. + * + * Returns the result of shutdown on sockets_[n-1] + */ + int stopAccepting(int shutdownFlags = -1); + + /** + * Get the maximum number of connections that will be accepted each time + * around the event loop. + */ + uint32_t getMaxAcceptAtOnce() const { + return maxAcceptAtOnce_; + } + + /** + * Set the maximum number of connections that will be accepted each time + * around the event loop. + * + * This provides a very coarse-grained way of controlling how fast the + * AsyncServerSocket will accept connections. If you find that when your + * server is overloaded AsyncServerSocket accepts connections more quickly + * than your code can process them, you can try lowering this number so that + * fewer connections will be accepted each event loop iteration. + * + * For more explicit control over the accept rate, you can also use + * pauseAccepting() to temporarily pause accepting when your server is + * overloaded, and then use startAccepting() later to resume accepting. + */ + void setMaxAcceptAtOnce(uint32_t numConns) { + maxAcceptAtOnce_ = numConns; + } + + /** + * Get the maximum number of unprocessed messages which a NotificationQueue + * can hold. + */ + uint32_t getMaxNumMessagesInQueue() const { + return maxNumMsgsInQueue_; + } + + /** + * Set the maximum number of unprocessed messages in NotificationQueue. + * No new message will be sent to that NotificationQueue if there are more + * than such number of unprocessed messages in that queue. + * + * Only works if called before addAcceptCallback. + */ + void setMaxNumMessagesInQueue(uint32_t num) { + maxNumMsgsInQueue_ = num; + } + + /** + * Get the speed of adjusting connection accept rate. + */ + double getAcceptRateAdjustSpeed() const { + return acceptRateAdjustSpeed_; + } + + /** + * Set the speed of adjusting connection accept rate. + */ + void setAcceptRateAdjustSpeed(double speed) { + acceptRateAdjustSpeed_ = speed; + } + + /** + * Get the number of connections dropped by the AsyncServerSocket + */ + uint64_t getNumDroppedConnections() const { + return numDroppedConnections_; + } + + /** + * Set whether or not SO_KEEPALIVE should be enabled on the server socket + * (and thus on all subsequently-accepted connections). By default, keepalive + * is enabled. + * + * Note that TCP keepalive usually only kicks in after the connection has + * been idle for several hours. Applications should almost always have their + * own, shorter idle timeout. + */ + void setKeepAliveEnabled(bool enabled) { + keepAliveEnabled_ = enabled; + + for (auto& handler : sockets_) { + if (handler.socket_ < 0) { + continue; + } + + int val = (enabled) ? 1 : 0; + if (setsockopt(handler.socket_, SOL_SOCKET, + SO_KEEPALIVE, &val, sizeof(val)) != 0) { + LOG(ERROR) << "failed to set SO_KEEPALIVE on async server socket: %s" << + strerror(errno); + } + } + } + + /** + * Get whether or not SO_KEEPALIVE is enabled on the server socket. + */ + bool getKeepAliveEnabled() const { + return keepAliveEnabled_; + } + + /** + * Set whether or not the socket should close during exec() (FD_CLOEXEC). By + * default, this is enabled + */ + void setCloseOnExec(bool closeOnExec) { + closeOnExec_ = closeOnExec; + } + + /** + * Get whether or not FD_CLOEXEC is enabled on the server socket. + */ + bool getCloseOnExec() const { + return closeOnExec_; + } + + protected: + /** + * Protected destructor. + * + * Invoke destroy() instead to destroy the AsyncServerSocket. + */ + virtual ~AsyncServerSocket(); + + private: + enum class MessageType { + MSG_NEW_CONN = 0, + MSG_ERROR = 1 + }; + + struct QueueMessage { + MessageType type; + int fd; + int err; + SocketAddress address; + std::string msg; + }; + + /** + * A class to receive notifications to invoke AcceptCallback objects + * in other EventBase threads. + * + * A RemoteAcceptor object is created for each AcceptCallback that + * is installed in a separate EventBase thread. The RemoteAcceptor + * receives notification of new sockets via a NotificationQueue, + * and then invokes the AcceptCallback. + */ + class RemoteAcceptor + : private NotificationQueue::Consumer { + public: + explicit RemoteAcceptor(AcceptCallback *callback) + : callback_(callback) {} + + ~RemoteAcceptor() {} + + void start(EventBase *eventBase, uint32_t maxAtOnce, uint32_t maxInQueue); + void stop(EventBase* eventBase, AcceptCallback* callback); + + virtual void messageAvailable(QueueMessage&& message); + + NotificationQueue* getQueue() { + return &queue_; + } + + private: + AcceptCallback *callback_; + + NotificationQueue queue_; + }; + + /** + * A struct to keep track of the callbacks associated with this server + * socket. + */ + struct CallbackInfo { + CallbackInfo(AcceptCallback *cb, EventBase *evb) + : callback(cb), + eventBase(evb), + consumer(nullptr) {} + + AcceptCallback *callback; + EventBase *eventBase; + + RemoteAcceptor* consumer; + }; + + class BackoffTimeout; + + virtual void handlerReady( + uint16_t events, int socket, sa_family_t family) noexcept; + + int createSocket(int family); + void setupSocket(int fd); + void dispatchSocket(int socket, SocketAddress&& address); + void dispatchError(const char *msg, int errnoValue); + void enterBackoff(); + void backoffTimeoutExpired(); + + CallbackInfo* nextCallback() { + CallbackInfo* info = &callbacks_[callbackIndex_]; + + ++callbackIndex_; + if (callbackIndex_ >= callbacks_.size()) { + callbackIndex_ = 0; + } + + return info; + } + + struct ServerEventHandler : public EventHandler { + ServerEventHandler(EventBase* eventBase, int socket, + AsyncServerSocket* parent, + sa_family_t addressFamily) + : EventHandler(eventBase, socket) + , eventBase_(eventBase) + , socket_(socket) + , parent_(parent) + , addressFamily_(addressFamily) {} + + ServerEventHandler(const ServerEventHandler& other) + : EventHandler(other.eventBase_, other.socket_) + , eventBase_(other.eventBase_) + , socket_(other.socket_) + , parent_(other.parent_) + , addressFamily_(other.addressFamily_) {} + + ServerEventHandler& operator=( + const ServerEventHandler& other) { + if (this != &other) { + eventBase_ = other.eventBase_; + socket_ = other.socket_; + parent_ = other.parent_; + addressFamily_ = other.addressFamily_; + + detachEventBase(); + attachEventBase(other.eventBase_); + changeHandlerFD(other.socket_); + } + return *this; + } + + // Inherited from EventHandler + virtual void handlerReady(uint16_t events) noexcept { + parent_->handlerReady(events, socket_, addressFamily_); + } + + EventBase* eventBase_; + int socket_; + AsyncServerSocket* parent_; + sa_family_t addressFamily_; + }; + + EventBase *eventBase_; + std::vector sockets_; + std::vector pendingCloseSockets_; + bool accepting_; + uint32_t maxAcceptAtOnce_; + uint32_t maxNumMsgsInQueue_; + double acceptRateAdjustSpeed_; //0 to disable auto adjust + double acceptRate_; + std::chrono::time_point lastAccepTimestamp_; + uint64_t numDroppedConnections_; + uint32_t callbackIndex_; + BackoffTimeout *backoffTimeout_; + std::vector callbacks_; + bool keepAliveEnabled_; + bool closeOnExec_; + ShutdownSocketSet* shutdownSocketSet_; +}; + +} // folly -- 2.34.1