--- /dev/null
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <folly/io/ShutdownSocketSet.h>
+
+#include <sys/socket.h>
+#include <sys/types.h>
+
+#include <chrono>
+#include <thread>
+
+#include <glog/logging.h>
+
+#include <folly/FileUtil.h>
+#include <folly/Malloc.h>
+
+namespace folly {
+
+ShutdownSocketSet::ShutdownSocketSet(size_t maxFd)
+ : maxFd_(maxFd),
+ data_(static_cast<std::atomic<uint8_t>*>(
+ folly::checkedCalloc(maxFd, sizeof(std::atomic<uint8_t>)))),
+ nullFile_("/dev/null", O_RDWR) {
+}
+
+void ShutdownSocketSet::add(int fd) {
+ // Silently ignore any fds >= maxFd_, very unlikely
+ DCHECK_GE(fd, 0);
+ if (fd >= maxFd_) {
+ return;
+ }
+
+ auto& sref = data_[fd];
+ uint8_t prevState = FREE;
+ CHECK(sref.compare_exchange_strong(prevState,
+ IN_USE,
+ std::memory_order_acq_rel))
+ << "Invalid prev state for fd " << fd << ": " << int(prevState);
+}
+
+void ShutdownSocketSet::remove(int fd) {
+ DCHECK_GE(fd, 0);
+ if (fd >= maxFd_) {
+ return;
+ }
+
+ auto& sref = data_[fd];
+ uint8_t prevState = 0;
+
+retry_load:
+ prevState = sref.load(std::memory_order_relaxed);
+
+retry:
+ switch (prevState) {
+ case IN_SHUTDOWN:
+ std::this_thread::sleep_for(std::chrono::milliseconds(1));
+ goto retry_load;
+ case FREE:
+ LOG(FATAL) << "Invalid prev state for fd " << fd << ": " << int(prevState);
+ }
+
+ if (!sref.compare_exchange_weak(prevState,
+ FREE,
+ std::memory_order_acq_rel)) {
+ goto retry;
+ }
+}
+
+int ShutdownSocketSet::close(int fd) {
+ DCHECK_GE(fd, 0);
+ if (fd >= maxFd_) {
+ return folly::closeNoInt(fd);
+ }
+
+ auto& sref = data_[fd];
+ uint8_t prevState = sref.load(std::memory_order_relaxed);
+ uint8_t newState = 0;
+
+retry:
+ switch (prevState) {
+ case IN_USE:
+ case SHUT_DOWN:
+ newState = FREE;
+ break;
+ case IN_SHUTDOWN:
+ newState = MUST_CLOSE;
+ break;
+ default:
+ LOG(FATAL) << "Invalid prev state for fd " << fd << ": " << int(prevState);
+ }
+
+ if (!sref.compare_exchange_weak(prevState,
+ newState,
+ std::memory_order_acq_rel)) {
+ goto retry;
+ }
+
+ return newState == FREE ? folly::closeNoInt(fd) : 0;
+}
+
+void ShutdownSocketSet::shutdown(int fd, bool abortive) {
+ DCHECK_GE(fd, 0);
+ if (fd >= maxFd_) {
+ doShutdown(fd, abortive);
+ return;
+ }
+
+ auto& sref = data_[fd];
+ uint8_t prevState = IN_USE;
+ if (!sref.compare_exchange_strong(prevState,
+ IN_SHUTDOWN,
+ std::memory_order_acq_rel)) {
+ return;
+ }
+
+ doShutdown(fd, abortive);
+
+ prevState = IN_SHUTDOWN;
+ if (sref.compare_exchange_strong(prevState,
+ SHUT_DOWN,
+ std::memory_order_acq_rel)) {
+ return;
+ }
+
+ CHECK_EQ(prevState, MUST_CLOSE)
+ << "Invalid prev state for fd " << fd << ": " << int(prevState);
+
+ folly::closeNoInt(fd); // ignore errors, nothing to do
+
+ CHECK(sref.compare_exchange_strong(prevState,
+ FREE,
+ std::memory_order_acq_rel))
+ << "Invalid prev state for fd " << fd << ": " << int(prevState);
+}
+
+void ShutdownSocketSet::shutdownAll(bool abortive) {
+ for (size_t i = 0; i < maxFd_; ++i) {
+ auto& sref = data_[i];
+ if (sref.load(std::memory_order_acquire) == IN_USE) {
+ shutdown(i, abortive);
+ }
+ }
+}
+
+void ShutdownSocketSet::doShutdown(int fd, bool abortive) {
+ // shutdown() the socket first, to awaken any threads blocked on the fd
+ // (subsequent IO will fail because it's been shutdown); close()ing the
+ // socket does not wake up blockers, see
+ // http://stackoverflow.com/a/3624545/1736339
+ folly::shutdownNoInt(fd, SHUT_RDWR);
+
+ // If abortive shutdown is desired, we'll set the SO_LINGER option on
+ // the socket with a timeout of 0; this will cause RST to be sent on
+ // close.
+ if (abortive) {
+ struct linger l = {1, 0};
+ if (setsockopt(fd, SOL_SOCKET, SO_LINGER, &l, sizeof(l)) != 0) {
+ // Probably not a socket, ignore.
+ return;
+ }
+ }
+
+ // We can't close() the socket, as that would be dangerous; a new file
+ // could be opened and get the same file descriptor, and then code assuming
+ // the old fd would do IO in the wrong place. We'll (atomically) dup2
+ // /dev/null onto the fd instead.
+ folly::dup2NoInt(nullFile_.fd(), fd);
+}
+
+} // namespaces
--- /dev/null
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <atomic>
+#include <cstdlib>
+#include <memory>
+
+#include <boost/noncopyable.hpp>
+
+#include <folly/File.h>
+
+namespace folly {
+
+/**
+ * Set of sockets that allows immediate, take-no-prisoners abort.
+ */
+class ShutdownSocketSet : private boost::noncopyable {
+ public:
+ /**
+ * Create a socket set that can handle file descriptors < maxFd.
+ * The default value (256Ki) is high enough for just about all
+ * applications, even if you increased the number of file descriptors
+ * on your system.
+ */
+ explicit ShutdownSocketSet(size_t maxFd = 1 << 18);
+
+ /**
+ * Add an already open socket to the list of sockets managed by
+ * ShutdownSocketSet. You MUST close the socket by calling
+ * ShutdownSocketSet::close (which will, as a side effect, also handle EINTR
+ * properly) and not by calling close() on the file descriptor.
+ */
+ void add(int fd);
+
+ /**
+ * Remove a socket from the list of sockets managed by ShutdownSocketSet.
+ * Note that remove() might block! (which we lamely implement by
+ * sleep()-ing) in the extremely rare case that the fd is currently
+ * being shutdown().
+ */
+ void remove(int fd);
+
+ /**
+ * Close a socket managed by ShutdownSocketSet. Returns the same return code
+ * as ::close() (and sets errno accordingly).
+ */
+ int close(int fd);
+
+ /**
+ * Shut down a socket. If abortive is true, we perform an abortive
+ * shutdown (send RST rather than FIN). Note that we might still end up
+ * sending FIN due to the rather interesting implementation.
+ *
+ * This is async-signal-safe and ignores errors. Obviously, subsequent
+ * read() and write() operations to the socket will fail. During normal
+ * operation, just call ::shutdown() on the socket.
+ */
+ void shutdown(int fd, bool abortive=false);
+
+ /**
+ * Shut down all sockets managed by ShutdownSocketSet. This is
+ * async-signal-safe and ignores errors.
+ */
+ void shutdownAll(bool abortive=false);
+
+ private:
+ void doShutdown(int fd, bool abortive);
+
+ // State transitions:
+ // add():
+ // FREE -> IN_USE
+ //
+ // close():
+ // IN_USE -> (::close()) -> FREE
+ // SHUT_DOWN -> (::close()) -> FREE
+ // IN_SHUTDOWN -> MUST_CLOSE
+ // (If the socket is currently being shut down, we must defer the
+ // ::close() until the shutdown completes)
+ //
+ // shutdown():
+ // IN_USE -> IN_SHUTDOWN
+ // (::shutdown())
+ // IN_SHUTDOWN -> SHUT_DOWN
+ // MUST_CLOSE -> (::close()) -> FREE
+ enum State : uint8_t {
+ FREE = 0,
+ IN_USE,
+ IN_SHUTDOWN,
+ SHUT_DOWN,
+ MUST_CLOSE
+ };
+
+ struct Free {
+ template <class T>
+ void operator()(T* ptr) const {
+ ::free(ptr);
+ }
+ };
+
+ const size_t maxFd_;
+ std::unique_ptr<std::atomic<uint8_t>[], Free> data_;
+ folly::File nullFile_;
+};
+
+} // namespaces