From 4abb5a3ac6d63272621763f94d2ab59bf201af06 Mon Sep 17 00:00:00 2001 From: Vitaly Berov Date: Fri, 13 Oct 2017 09:30:02 -0700 Subject: [PATCH] Replace ShutdownSocketSet to singleton Summary: We recently found out that ShutdownSocketSet consumes 150+MB for our service, which uses duplex channels. The problem is that we create ~1000 of ThriftServers, and each of the creates its own ShutdownSocketSet. In reality, ShutdownSocketSet is only needed to kill all socket's FD in emergency before crash dump is taken, so they don't hand around waiting for crash dump to complete. There is no need to keep a SSS per ThriftServer, singleton should work just fine. There is a problem here, though. Currently a ThriftServer has 'immediateShutdown' method, which kills all sockets from SSS. So, if SSS becomes a singleton, and we have more than one ThriftServer, calling 'immediateShutdown' on one will kill sockets from the other one. First, it's a quite surprising behavior, and second, it complicates unit tests, which emulate thrift servers running in different processes. As a result, 1. ShutdownSocketSet is created as a singleton, but each ThriftServer still keeps weak ptr to it (mostly for unit tests support). 2. replaceShutdownSocketSet method is added to ThriftServer.h, so unit tests could set different SSS for different ThriftServers. 3. method immediateShutdown is removed from ThriftServer, because its behavior would be 'surprising'. There still may be unexpected consequences of this change for the tests because of Singleton, but let's see. Reviewed By: yfeldblum Differential Revision: D6015576 fbshipit-source-id: dab70dbf82d01bcc71bbe063f983e862911ceb24 --- folly/io/ShutdownSocketSet.cpp | 10 +++++ folly/io/ShutdownSocketSet.h | 24 ++++++++++- folly/io/async/AsyncServerSocket.cpp | 64 +++++++++++++++------------- folly/io/async/AsyncServerSocket.h | 4 +- folly/io/async/AsyncSocket.cpp | 35 +++++++++------ folly/io/async/AsyncSocket.h | 4 +- 6 files changed, 92 insertions(+), 49 deletions(-) diff --git a/folly/io/ShutdownSocketSet.cpp b/folly/io/ShutdownSocketSet.cpp index 09988e7a..a64ae49a 100644 --- a/folly/io/ShutdownSocketSet.cpp +++ b/folly/io/ShutdownSocketSet.cpp @@ -22,16 +22,26 @@ #include #include +#include #include namespace folly { +namespace { +struct PrivateTag {}; +folly::Singleton singleton; +} // namespace + ShutdownSocketSet::ShutdownSocketSet(int maxFd) : maxFd_(maxFd), data_(static_cast*>( folly::checkedCalloc(size_t(maxFd), sizeof(std::atomic)))), nullFile_("/dev/null", O_RDWR) {} +std::shared_ptr ShutdownSocketSet::getInstance() { + return singleton.try_get(); +} + void ShutdownSocketSet::add(int fd) { // Silently ignore any fds >= maxFd_, very unlikely DCHECK_GE(fd, 0); diff --git a/folly/io/ShutdownSocketSet.h b/folly/io/ShutdownSocketSet.h index 7d0853c2..9ba145db 100644 --- a/folly/io/ShutdownSocketSet.h +++ b/folly/io/ShutdownSocketSet.h @@ -39,6 +39,10 @@ class ShutdownSocketSet : private boost::noncopyable { */ explicit ShutdownSocketSet(int maxFd = 1 << 18); + // Singleton instance used by all thrift servers. + // May return nullptr on startup/shutdown. + static std::shared_ptr getInstance(); + /** * Add an already open socket to the list of sockets managed by * ShutdownSocketSet. You MUST close the socket by calling @@ -73,8 +77,24 @@ class ShutdownSocketSet : private boost::noncopyable { void shutdown(int fd, bool abortive=false); /** - * Shut down all sockets managed by ShutdownSocketSet. This is - * async-signal-safe and ignores errors. + * Immediate shutdown of all connections. This is a hard-hitting hammer; + * all reads and writes will return errors and no new connections will + * be accepted. + * + * To be used only in dire situations. We're using it from the failure + * signal handler to close all connections quickly, even though the server + * might take multiple seconds to finish crashing. + * + * The optional bool parameter indicates whether to set the active + * connections in to not linger. The effect of that includes RST packets + * being immediately sent to clients which will result + * in errors (and not normal EOF) on the client side. This also causes + * the local (ip, tcp port number) tuple to be reusable immediately, instead + * of having to wait the standard amount of time. For full details see + * the `shutdown` method of `ShutdownSocketSet` (incl. notes about the + * `abortive` parameter). + * + * This is async-signal-safe and ignores errors. */ void shutdownAll(bool abortive=false); diff --git a/folly/io/async/AsyncServerSocket.cpp b/folly/io/async/AsyncServerSocket.cpp index c9f391f0..72f29d0c 100644 --- a/folly/io/async/AsyncServerSocket.cpp +++ b/folly/io/async/AsyncServerSocket.cpp @@ -146,37 +146,42 @@ class AsyncServerSocket::BackoffTimeout : public AsyncTimeout { */ AsyncServerSocket::AsyncServerSocket(EventBase* eventBase) -: eventBase_(eventBase), - accepting_(false), - maxAcceptAtOnce_(kDefaultMaxAcceptAtOnce), - maxNumMsgsInQueue_(kDefaultMaxMessagesInQueue), - acceptRateAdjustSpeed_(0), - acceptRate_(1), - lastAccepTimestamp_(std::chrono::steady_clock::now()), - numDroppedConnections_(0), - callbackIndex_(0), - backoffTimeout_(nullptr), - callbacks_(), - keepAliveEnabled_(true), - closeOnExec_(true), - shutdownSocketSet_(nullptr) { -} - -void AsyncServerSocket::setShutdownSocketSet(ShutdownSocketSet* newSS) { - if (shutdownSocketSet_ == newSS) { + : eventBase_(eventBase), + accepting_(false), + maxAcceptAtOnce_(kDefaultMaxAcceptAtOnce), + maxNumMsgsInQueue_(kDefaultMaxMessagesInQueue), + acceptRateAdjustSpeed_(0), + acceptRate_(1), + lastAccepTimestamp_(std::chrono::steady_clock::now()), + numDroppedConnections_(0), + callbackIndex_(0), + backoffTimeout_(nullptr), + callbacks_(), + keepAliveEnabled_(true), + closeOnExec_(true) {} + +void AsyncServerSocket::setShutdownSocketSet( + const std::weak_ptr& wNewSS) { + const auto newSS = wNewSS.lock(); + const auto shutdownSocketSet = wShutdownSocketSet_.lock(); + + if (shutdownSocketSet == newSS) { return; } - if (shutdownSocketSet_) { + + if (shutdownSocketSet) { for (auto& h : sockets_) { - shutdownSocketSet_->remove(h.socket_); + shutdownSocketSet->remove(h.socket_); } } - shutdownSocketSet_ = newSS; - if (shutdownSocketSet_) { + + if (newSS) { for (auto& h : sockets_) { - shutdownSocketSet_->add(h.socket_); + newSS->add(h.socket_); } } + + wShutdownSocketSet_ = wNewSS; } AsyncServerSocket::~AsyncServerSocket() { @@ -203,8 +208,8 @@ int AsyncServerSocket::stopAccepting(int shutdownFlags) { for (; !sockets_.empty(); sockets_.pop_back()) { auto& handler = sockets_.back(); handler.unregisterHandler(); - if (shutdownSocketSet_) { - shutdownSocketSet_->close(handler.socket_); + if (const auto shutdownSocketSet = wShutdownSocketSet_.lock()) { + shutdownSocketSet->close(handler.socket_); } else if (shutdownFlags >= 0) { result = shutdownNoInt(handler.socket_, shutdownFlags); pendingCloseSockets_.push_back(handler.socket_); @@ -504,8 +509,9 @@ void AsyncServerSocket::bind(uint16_t port) { for (const auto& socket : sockets_) { if (socket.socket_ <= 0) { continue; - } else if (shutdownSocketSet_) { - shutdownSocketSet_->close(socket.socket_); + } else if ( + const auto shutdownSocketSet = wShutdownSocketSet_.lock()) { + shutdownSocketSet->close(socket.socket_); } else { closeNoInt(socket.socket_); } @@ -793,8 +799,8 @@ void AsyncServerSocket::setupSocket(int fd, int family) { } #endif - if (shutdownSocketSet_) { - shutdownSocketSet_->add(fd); + if (const auto shutdownSocketSet = wShutdownSocketSet_.lock()) { + shutdownSocketSet->add(fd); } } diff --git a/folly/io/async/AsyncServerSocket.h b/folly/io/async/AsyncServerSocket.h index 6589c667..0a07d546 100644 --- a/folly/io/async/AsyncServerSocket.h +++ b/folly/io/async/AsyncServerSocket.h @@ -227,7 +227,7 @@ class AsyncServerSocket : public DelayedDestruction Destructor()); } - void setShutdownSocketSet(ShutdownSocketSet* newSS); + void setShutdownSocketSet(const std::weak_ptr& wNewSS); /** * Destroy the socket. @@ -877,7 +877,7 @@ class AsyncServerSocket : public DelayedDestruction bool tfo_{false}; bool noTransparentTls_{false}; uint32_t tfoMaxQueueSize_{0}; - ShutdownSocketSet* shutdownSocketSet_; + std::weak_ptr wShutdownSocketSet_; ConnectionEventCallback* connectionEventCallback_{nullptr}; }; diff --git a/folly/io/async/AsyncSocket.cpp b/folly/io/async/AsyncSocket.cpp index 6d32ca61..7347dc68 100644 --- a/folly/io/async/AsyncSocket.cpp +++ b/folly/io/async/AsyncSocket.cpp @@ -307,7 +307,7 @@ void AsyncSocket::init() { readCallback_ = nullptr; writeReqHead_ = nullptr; writeReqTail_ = nullptr; - shutdownSocketSet_ = nullptr; + wShutdownSocketSet_.reset(); appBytesWritten_ = 0; appBytesReceived_ = 0; sendMsgParamCallback_ = &defaultSendMsgParamsCallback; @@ -336,8 +336,8 @@ int AsyncSocket::detachFd() { << ", events=" << std::hex << eventFlags_ << ")"; // Extract the fd, and set fd_ to -1 first, so closeNow() won't // actually close the descriptor. - if (shutdownSocketSet_) { - shutdownSocketSet_->remove(fd_); + if (const auto socketSet = wShutdownSocketSet_.lock()) { + socketSet->remove(fd_); } int fd = fd_; fd_ = -1; @@ -355,17 +355,24 @@ const folly::SocketAddress& AsyncSocket::anyAddress() { return anyAddress; } -void AsyncSocket::setShutdownSocketSet(ShutdownSocketSet* newSS) { - if (shutdownSocketSet_ == newSS) { +void AsyncSocket::setShutdownSocketSet( + const std::weak_ptr& wNewSS) { + const auto newSS = wNewSS.lock(); + const auto shutdownSocketSet = wShutdownSocketSet_.lock(); + + if (newSS == shutdownSocketSet) { return; } - if (shutdownSocketSet_ && fd_ != -1) { - shutdownSocketSet_->remove(fd_); + + if (shutdownSocketSet && fd_ != -1) { + shutdownSocketSet->remove(fd_); } - shutdownSocketSet_ = newSS; - if (shutdownSocketSet_ && fd_ != -1) { - shutdownSocketSet_->add(fd_); + + if (newSS && fd_ != -1) { + newSS->add(fd_); } + + wShutdownSocketSet_ = wNewSS; } void AsyncSocket::setCloseOnExec() { @@ -420,8 +427,8 @@ void AsyncSocket::connect(ConnectCallback* callback, withAddr("failed to create socket"), errnoCopy); } - if (shutdownSocketSet_) { - shutdownSocketSet_->add(fd_); + if (const auto shutdownSocketSet = wShutdownSocketSet_.lock()) { + shutdownSocketSet->add(fd_); } ioHandler_.changeHandlerFD(fd_); @@ -2685,8 +2692,8 @@ void AsyncSocket::invalidState(WriteCallback* callback) { void AsyncSocket::doClose() { if (fd_ == -1) return; - if (shutdownSocketSet_) { - shutdownSocketSet_->close(fd_); + if (const auto shutdownSocketSet = wShutdownSocketSet_.lock()) { + shutdownSocketSet->close(fd_); } else { ::close(fd_); } diff --git a/folly/io/async/AsyncSocket.h b/folly/io/async/AsyncSocket.h index e99300fb..a4c637bb 100644 --- a/folly/io/async/AsyncSocket.h +++ b/folly/io/async/AsyncSocket.h @@ -222,7 +222,7 @@ class AsyncSocket : virtual public AsyncTransportWrapper { */ explicit AsyncSocket(EventBase* evb); - void setShutdownSocketSet(ShutdownSocketSet* ss); + void setShutdownSocketSet(const std::weak_ptr& wSS); /** * Create a new AsyncSocket and begin the connection process. @@ -1195,7 +1195,7 @@ class AsyncSocket : virtual public AsyncTransportWrapper { ReadCallback* readCallback_; ///< ReadCallback WriteRequest* writeReqHead_; ///< Chain of WriteRequests WriteRequest* writeReqTail_; ///< End of WriteRequest chain - ShutdownSocketSet* shutdownSocketSet_; + std::weak_ptr wShutdownSocketSet_; size_t appBytesReceived_; ///< Num of bytes received from socket size_t appBytesWritten_; ///< Num of bytes written to socket bool isBufferMovable_{false}; -- 2.34.1