Summary:
We recently found out that ShutdownSocketSet consumes 150+MB for our service, which uses duplex channels. The problem is that we create ~1000 of ThriftServers, and each of the creates its own ShutdownSocketSet.
In reality, ShutdownSocketSet is only needed to kill all socket's FD in emergency before crash dump is taken, so they don't hand around waiting for crash dump to complete. There is no need to keep a SSS per ThriftServer, singleton should work just fine.
There is a problem here, though. Currently a ThriftServer has 'immediateShutdown' method, which kills all sockets from SSS. So, if SSS becomes a singleton, and we have more than one ThriftServer, calling 'immediateShutdown' on one will kill sockets from the other one. First, it's a quite surprising behavior, and second, it complicates unit tests, which emulate thrift servers running in different processes.
As a result,
1. ShutdownSocketSet is created as a singleton, but each ThriftServer still keeps weak ptr to it (mostly for unit tests support).
2. replaceShutdownSocketSet method is added to ThriftServer.h, so unit tests could set different SSS for different ThriftServers.
3. method immediateShutdown is removed from ThriftServer, because its behavior would be 'surprising'.
There still may be unexpected consequences of this change for the tests because of Singleton, but let's see.
Reviewed By: yfeldblum
Differential Revision:
D6015576
fbshipit-source-id:
dab70dbf82d01bcc71bbe063f983e862911ceb24
#include <glog/logging.h>
#include <folly/FileUtil.h>
+#include <folly/Singleton.h>
#include <folly/portability/Sockets.h>
namespace folly {
+namespace {
+struct PrivateTag {};
+folly::Singleton<folly::ShutdownSocketSet, PrivateTag> singleton;
+} // namespace
+
ShutdownSocketSet::ShutdownSocketSet(int maxFd)
: maxFd_(maxFd),
data_(static_cast<std::atomic<uint8_t>*>(
folly::checkedCalloc(size_t(maxFd), sizeof(std::atomic<uint8_t>)))),
nullFile_("/dev/null", O_RDWR) {}
+std::shared_ptr<ShutdownSocketSet> ShutdownSocketSet::getInstance() {
+ return singleton.try_get();
+}
+
void ShutdownSocketSet::add(int fd) {
// Silently ignore any fds >= maxFd_, very unlikely
DCHECK_GE(fd, 0);
*/
explicit ShutdownSocketSet(int maxFd = 1 << 18);
+ // Singleton instance used by all thrift servers.
+ // May return nullptr on startup/shutdown.
+ static std::shared_ptr<ShutdownSocketSet> getInstance();
+
/**
* Add an already open socket to the list of sockets managed by
* ShutdownSocketSet. You MUST close the socket by calling
void shutdown(int fd, bool abortive=false);
/**
- * Shut down all sockets managed by ShutdownSocketSet. This is
- * async-signal-safe and ignores errors.
+ * Immediate shutdown of all connections. This is a hard-hitting hammer;
+ * all reads and writes will return errors and no new connections will
+ * be accepted.
+ *
+ * To be used only in dire situations. We're using it from the failure
+ * signal handler to close all connections quickly, even though the server
+ * might take multiple seconds to finish crashing.
+ *
+ * The optional bool parameter indicates whether to set the active
+ * connections in to not linger. The effect of that includes RST packets
+ * being immediately sent to clients which will result
+ * in errors (and not normal EOF) on the client side. This also causes
+ * the local (ip, tcp port number) tuple to be reusable immediately, instead
+ * of having to wait the standard amount of time. For full details see
+ * the `shutdown` method of `ShutdownSocketSet` (incl. notes about the
+ * `abortive` parameter).
+ *
+ * This is async-signal-safe and ignores errors.
*/
void shutdownAll(bool abortive=false);
*/
AsyncServerSocket::AsyncServerSocket(EventBase* eventBase)
-: eventBase_(eventBase),
- accepting_(false),
- maxAcceptAtOnce_(kDefaultMaxAcceptAtOnce),
- maxNumMsgsInQueue_(kDefaultMaxMessagesInQueue),
- acceptRateAdjustSpeed_(0),
- acceptRate_(1),
- lastAccepTimestamp_(std::chrono::steady_clock::now()),
- numDroppedConnections_(0),
- callbackIndex_(0),
- backoffTimeout_(nullptr),
- callbacks_(),
- keepAliveEnabled_(true),
- closeOnExec_(true),
- shutdownSocketSet_(nullptr) {
-}
-
-void AsyncServerSocket::setShutdownSocketSet(ShutdownSocketSet* newSS) {
- if (shutdownSocketSet_ == newSS) {
+ : eventBase_(eventBase),
+ accepting_(false),
+ maxAcceptAtOnce_(kDefaultMaxAcceptAtOnce),
+ maxNumMsgsInQueue_(kDefaultMaxMessagesInQueue),
+ acceptRateAdjustSpeed_(0),
+ acceptRate_(1),
+ lastAccepTimestamp_(std::chrono::steady_clock::now()),
+ numDroppedConnections_(0),
+ callbackIndex_(0),
+ backoffTimeout_(nullptr),
+ callbacks_(),
+ keepAliveEnabled_(true),
+ closeOnExec_(true) {}
+
+void AsyncServerSocket::setShutdownSocketSet(
+ const std::weak_ptr<ShutdownSocketSet>& wNewSS) {
+ const auto newSS = wNewSS.lock();
+ const auto shutdownSocketSet = wShutdownSocketSet_.lock();
+
+ if (shutdownSocketSet == newSS) {
return;
}
- if (shutdownSocketSet_) {
+
+ if (shutdownSocketSet) {
for (auto& h : sockets_) {
- shutdownSocketSet_->remove(h.socket_);
+ shutdownSocketSet->remove(h.socket_);
}
}
- shutdownSocketSet_ = newSS;
- if (shutdownSocketSet_) {
+
+ if (newSS) {
for (auto& h : sockets_) {
- shutdownSocketSet_->add(h.socket_);
+ newSS->add(h.socket_);
}
}
+
+ wShutdownSocketSet_ = wNewSS;
}
AsyncServerSocket::~AsyncServerSocket() {
for (; !sockets_.empty(); sockets_.pop_back()) {
auto& handler = sockets_.back();
handler.unregisterHandler();
- if (shutdownSocketSet_) {
- shutdownSocketSet_->close(handler.socket_);
+ if (const auto shutdownSocketSet = wShutdownSocketSet_.lock()) {
+ shutdownSocketSet->close(handler.socket_);
} else if (shutdownFlags >= 0) {
result = shutdownNoInt(handler.socket_, shutdownFlags);
pendingCloseSockets_.push_back(handler.socket_);
for (const auto& socket : sockets_) {
if (socket.socket_ <= 0) {
continue;
- } else if (shutdownSocketSet_) {
- shutdownSocketSet_->close(socket.socket_);
+ } else if (
+ const auto shutdownSocketSet = wShutdownSocketSet_.lock()) {
+ shutdownSocketSet->close(socket.socket_);
} else {
closeNoInt(socket.socket_);
}
}
#endif
- if (shutdownSocketSet_) {
- shutdownSocketSet_->add(fd);
+ if (const auto shutdownSocketSet = wShutdownSocketSet_.lock()) {
+ shutdownSocketSet->add(fd);
}
}
Destructor());
}
- void setShutdownSocketSet(ShutdownSocketSet* newSS);
+ void setShutdownSocketSet(const std::weak_ptr<ShutdownSocketSet>& wNewSS);
/**
* Destroy the socket.
bool tfo_{false};
bool noTransparentTls_{false};
uint32_t tfoMaxQueueSize_{0};
- ShutdownSocketSet* shutdownSocketSet_;
+ std::weak_ptr<ShutdownSocketSet> wShutdownSocketSet_;
ConnectionEventCallback* connectionEventCallback_{nullptr};
};
readCallback_ = nullptr;
writeReqHead_ = nullptr;
writeReqTail_ = nullptr;
- shutdownSocketSet_ = nullptr;
+ wShutdownSocketSet_.reset();
appBytesWritten_ = 0;
appBytesReceived_ = 0;
sendMsgParamCallback_ = &defaultSendMsgParamsCallback;
<< ", events=" << std::hex << eventFlags_ << ")";
// Extract the fd, and set fd_ to -1 first, so closeNow() won't
// actually close the descriptor.
- if (shutdownSocketSet_) {
- shutdownSocketSet_->remove(fd_);
+ if (const auto socketSet = wShutdownSocketSet_.lock()) {
+ socketSet->remove(fd_);
}
int fd = fd_;
fd_ = -1;
return anyAddress;
}
-void AsyncSocket::setShutdownSocketSet(ShutdownSocketSet* newSS) {
- if (shutdownSocketSet_ == newSS) {
+void AsyncSocket::setShutdownSocketSet(
+ const std::weak_ptr<ShutdownSocketSet>& wNewSS) {
+ const auto newSS = wNewSS.lock();
+ const auto shutdownSocketSet = wShutdownSocketSet_.lock();
+
+ if (newSS == shutdownSocketSet) {
return;
}
- if (shutdownSocketSet_ && fd_ != -1) {
- shutdownSocketSet_->remove(fd_);
+
+ if (shutdownSocketSet && fd_ != -1) {
+ shutdownSocketSet->remove(fd_);
}
- shutdownSocketSet_ = newSS;
- if (shutdownSocketSet_ && fd_ != -1) {
- shutdownSocketSet_->add(fd_);
+
+ if (newSS && fd_ != -1) {
+ newSS->add(fd_);
}
+
+ wShutdownSocketSet_ = wNewSS;
}
void AsyncSocket::setCloseOnExec() {
withAddr("failed to create socket"),
errnoCopy);
}
- if (shutdownSocketSet_) {
- shutdownSocketSet_->add(fd_);
+ if (const auto shutdownSocketSet = wShutdownSocketSet_.lock()) {
+ shutdownSocketSet->add(fd_);
}
ioHandler_.changeHandlerFD(fd_);
void AsyncSocket::doClose() {
if (fd_ == -1) return;
- if (shutdownSocketSet_) {
- shutdownSocketSet_->close(fd_);
+ if (const auto shutdownSocketSet = wShutdownSocketSet_.lock()) {
+ shutdownSocketSet->close(fd_);
} else {
::close(fd_);
}
*/
explicit AsyncSocket(EventBase* evb);
- void setShutdownSocketSet(ShutdownSocketSet* ss);
+ void setShutdownSocketSet(const std::weak_ptr<ShutdownSocketSet>& wSS);
/**
* Create a new AsyncSocket and begin the connection process.
ReadCallback* readCallback_; ///< ReadCallback
WriteRequest* writeReqHead_; ///< Chain of WriteRequests
WriteRequest* writeReqTail_; ///< End of WriteRequest chain
- ShutdownSocketSet* shutdownSocketSet_;
+ std::weak_ptr<ShutdownSocketSet> wShutdownSocketSet_;
size_t appBytesReceived_; ///< Num of bytes received from socket
size_t appBytesWritten_; ///< Num of bytes written to socket
bool isBufferMovable_{false};