From: Dave Watson Date: Fri, 3 Oct 2014 19:05:37 +0000 (-0700) Subject: move shutdown socket set X-Git-Tag: v0.22.0~268 X-Git-Url: http://demsky.eecs.uci.edu/git/?a=commitdiff_plain;h=71431a937ae36f689b87d17e2f466d7e703b4174;p=folly.git move shutdown socket set Summary: Move shutdownsocketset to folly, since it is a dep of the asyncsockets Previoulsy tried moving it in to the server directly: D1583629, but had issues - close(fd) is called before the error callback, so we can't remove the fd before the close, which is essential to it working properly. Just move it to folly instead. Test Plan: fbconfig -r thrift/lib/cpp thrift/lib/cpp2; fbmake runtests Reviewed By: dcsommer@fb.com Subscribers: mshneer, trunkagent, fugalh, jsedgwick, doug, alandau, bmatheny, njormrod FB internal diff: D1594950 --- diff --git a/folly/Makefile.am b/folly/Makefile.am index 24ed0e2b..a2f4b722 100644 --- a/folly/Makefile.am +++ b/folly/Makefile.am @@ -113,6 +113,7 @@ nobase_follyinclude_HEADERS = \ io/RecordIO.h \ io/RecordIO-inl.h \ io/TypedIOBuf.h \ + io/ShutdownSocketSet.h \ io/async/AsyncTimeout.h \ io/async/DelayedDestruction.h \ io/async/EventBase.h \ @@ -238,6 +239,7 @@ libfolly_la_SOURCES = \ io/IOBuf.cpp \ io/IOBufQueue.cpp \ io/RecordIO.cpp \ + io/ShutdownSocketSet.cpp \ io/async/AsyncTimeout.cpp \ io/async/EventBase.cpp \ io/async/EventBaseManager.cpp \ diff --git a/folly/io/ShutdownSocketSet.cpp b/folly/io/ShutdownSocketSet.cpp new file mode 100644 index 00000000..52125c25 --- /dev/null +++ b/folly/io/ShutdownSocketSet.cpp @@ -0,0 +1,183 @@ +/* + * Copyright 2014 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include +#include + +#include +#include + +#include + +#include +#include + +namespace folly { + +ShutdownSocketSet::ShutdownSocketSet(size_t maxFd) + : maxFd_(maxFd), + data_(static_cast*>( + folly::checkedCalloc(maxFd, sizeof(std::atomic)))), + nullFile_("/dev/null", O_RDWR) { +} + +void ShutdownSocketSet::add(int fd) { + // Silently ignore any fds >= maxFd_, very unlikely + DCHECK_GE(fd, 0); + if (fd >= maxFd_) { + return; + } + + auto& sref = data_[fd]; + uint8_t prevState = FREE; + CHECK(sref.compare_exchange_strong(prevState, + IN_USE, + std::memory_order_acq_rel)) + << "Invalid prev state for fd " << fd << ": " << int(prevState); +} + +void ShutdownSocketSet::remove(int fd) { + DCHECK_GE(fd, 0); + if (fd >= maxFd_) { + return; + } + + auto& sref = data_[fd]; + uint8_t prevState = 0; + +retry_load: + prevState = sref.load(std::memory_order_relaxed); + +retry: + switch (prevState) { + case IN_SHUTDOWN: + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + goto retry_load; + case FREE: + LOG(FATAL) << "Invalid prev state for fd " << fd << ": " << int(prevState); + } + + if (!sref.compare_exchange_weak(prevState, + FREE, + std::memory_order_acq_rel)) { + goto retry; + } +} + +int ShutdownSocketSet::close(int fd) { + DCHECK_GE(fd, 0); + if (fd >= maxFd_) { + return folly::closeNoInt(fd); + } + + auto& sref = data_[fd]; + uint8_t prevState = sref.load(std::memory_order_relaxed); + uint8_t newState = 0; + +retry: + switch (prevState) { + case IN_USE: + case SHUT_DOWN: + newState = FREE; + break; + case IN_SHUTDOWN: + newState = MUST_CLOSE; + break; + default: + LOG(FATAL) << "Invalid prev state for fd " << fd << ": " << int(prevState); + } + + if (!sref.compare_exchange_weak(prevState, + newState, + std::memory_order_acq_rel)) { + goto retry; + } + + return newState == FREE ? folly::closeNoInt(fd) : 0; +} + +void ShutdownSocketSet::shutdown(int fd, bool abortive) { + DCHECK_GE(fd, 0); + if (fd >= maxFd_) { + doShutdown(fd, abortive); + return; + } + + auto& sref = data_[fd]; + uint8_t prevState = IN_USE; + if (!sref.compare_exchange_strong(prevState, + IN_SHUTDOWN, + std::memory_order_acq_rel)) { + return; + } + + doShutdown(fd, abortive); + + prevState = IN_SHUTDOWN; + if (sref.compare_exchange_strong(prevState, + SHUT_DOWN, + std::memory_order_acq_rel)) { + return; + } + + CHECK_EQ(prevState, MUST_CLOSE) + << "Invalid prev state for fd " << fd << ": " << int(prevState); + + folly::closeNoInt(fd); // ignore errors, nothing to do + + CHECK(sref.compare_exchange_strong(prevState, + FREE, + std::memory_order_acq_rel)) + << "Invalid prev state for fd " << fd << ": " << int(prevState); +} + +void ShutdownSocketSet::shutdownAll(bool abortive) { + for (size_t i = 0; i < maxFd_; ++i) { + auto& sref = data_[i]; + if (sref.load(std::memory_order_acquire) == IN_USE) { + shutdown(i, abortive); + } + } +} + +void ShutdownSocketSet::doShutdown(int fd, bool abortive) { + // shutdown() the socket first, to awaken any threads blocked on the fd + // (subsequent IO will fail because it's been shutdown); close()ing the + // socket does not wake up blockers, see + // http://stackoverflow.com/a/3624545/1736339 + folly::shutdownNoInt(fd, SHUT_RDWR); + + // If abortive shutdown is desired, we'll set the SO_LINGER option on + // the socket with a timeout of 0; this will cause RST to be sent on + // close. + if (abortive) { + struct linger l = {1, 0}; + if (setsockopt(fd, SOL_SOCKET, SO_LINGER, &l, sizeof(l)) != 0) { + // Probably not a socket, ignore. + return; + } + } + + // We can't close() the socket, as that would be dangerous; a new file + // could be opened and get the same file descriptor, and then code assuming + // the old fd would do IO in the wrong place. We'll (atomically) dup2 + // /dev/null onto the fd instead. + folly::dup2NoInt(nullFile_.fd(), fd); +} + +} // namespaces diff --git a/folly/io/ShutdownSocketSet.h b/folly/io/ShutdownSocketSet.h new file mode 100644 index 00000000..a395e13a --- /dev/null +++ b/folly/io/ShutdownSocketSet.h @@ -0,0 +1,120 @@ +/* + * Copyright 2014 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include + +#include + +#include + +namespace folly { + +/** + * Set of sockets that allows immediate, take-no-prisoners abort. + */ +class ShutdownSocketSet : private boost::noncopyable { + public: + /** + * Create a socket set that can handle file descriptors < maxFd. + * The default value (256Ki) is high enough for just about all + * applications, even if you increased the number of file descriptors + * on your system. + */ + explicit ShutdownSocketSet(size_t maxFd = 1 << 18); + + /** + * Add an already open socket to the list of sockets managed by + * ShutdownSocketSet. You MUST close the socket by calling + * ShutdownSocketSet::close (which will, as a side effect, also handle EINTR + * properly) and not by calling close() on the file descriptor. + */ + void add(int fd); + + /** + * Remove a socket from the list of sockets managed by ShutdownSocketSet. + * Note that remove() might block! (which we lamely implement by + * sleep()-ing) in the extremely rare case that the fd is currently + * being shutdown(). + */ + void remove(int fd); + + /** + * Close a socket managed by ShutdownSocketSet. Returns the same return code + * as ::close() (and sets errno accordingly). + */ + int close(int fd); + + /** + * Shut down a socket. If abortive is true, we perform an abortive + * shutdown (send RST rather than FIN). Note that we might still end up + * sending FIN due to the rather interesting implementation. + * + * This is async-signal-safe and ignores errors. Obviously, subsequent + * read() and write() operations to the socket will fail. During normal + * operation, just call ::shutdown() on the socket. + */ + void shutdown(int fd, bool abortive=false); + + /** + * Shut down all sockets managed by ShutdownSocketSet. This is + * async-signal-safe and ignores errors. + */ + void shutdownAll(bool abortive=false); + + private: + void doShutdown(int fd, bool abortive); + + // State transitions: + // add(): + // FREE -> IN_USE + // + // close(): + // IN_USE -> (::close()) -> FREE + // SHUT_DOWN -> (::close()) -> FREE + // IN_SHUTDOWN -> MUST_CLOSE + // (If the socket is currently being shut down, we must defer the + // ::close() until the shutdown completes) + // + // shutdown(): + // IN_USE -> IN_SHUTDOWN + // (::shutdown()) + // IN_SHUTDOWN -> SHUT_DOWN + // MUST_CLOSE -> (::close()) -> FREE + enum State : uint8_t { + FREE = 0, + IN_USE, + IN_SHUTDOWN, + SHUT_DOWN, + MUST_CLOSE + }; + + struct Free { + template + void operator()(T* ptr) const { + ::free(ptr); + } + }; + + const size_t maxFd_; + std::unique_ptr[], Free> data_; + folly::File nullFile_; +}; + +} // namespaces