From 889093b27de6e53cf5e90eb3ff7006d0854eb646 Mon Sep 17 00:00:00 2001 From: Viswanath Sivakumar Date: Mon, 27 Oct 2014 19:38:10 -0700 Subject: [PATCH] Revert "D1587625 [thrift->folly] AsyncSocket" Summary: Test Plan: Reviewed By: cgheorghe@fb.com Subscribers: FB internal diff: D1642334 Blame Revision: D1587625 --- folly/Makefile.am | 3 - folly/io/async/AsyncSocket.cpp | 1967 ------------------------- folly/io/async/AsyncSocket.h | 766 ---------- folly/io/async/AsyncSocketException.h | 79 - folly/io/async/AsyncTransport.h | 317 ---- 5 files changed, 3132 deletions(-) delete mode 100644 folly/io/async/AsyncSocket.cpp delete mode 100644 folly/io/async/AsyncSocket.h delete mode 100644 folly/io/async/AsyncSocketException.h delete mode 100644 folly/io/async/AsyncTransport.h diff --git a/folly/Makefile.am b/folly/Makefile.am index 0d7f99fc..8b58320e 100644 --- a/folly/Makefile.am +++ b/folly/Makefile.am @@ -132,8 +132,6 @@ nobase_follyinclude_HEADERS = \ io/async/AsyncTimeout.h \ io/async/AsyncTransport.h \ io/async/AsyncServerSocket.h \ - io/async/AsyncSocket.h \ - io/async/AsyncSocketException.h \ io/async/DelayedDestruction.h \ io/async/EventBase.h \ io/async/EventBaseManager.h \ @@ -266,7 +264,6 @@ libfolly_la_SOURCES = \ io/ShutdownSocketSet.cpp \ io/async/AsyncTimeout.cpp \ io/async/AsyncServerSocket.cpp \ - io/async/AsyncSocket.cpp \ io/async/EventBase.cpp \ io/async/EventBaseManager.cpp \ io/async/EventHandler.cpp \ diff --git a/folly/io/async/AsyncSocket.cpp b/folly/io/async/AsyncSocket.cpp deleted file mode 100644 index 69cfe867..00000000 --- a/folly/io/async/AsyncSocket.cpp +++ /dev/null @@ -1,1967 +0,0 @@ -/* - * Copyright 2014 Facebook, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include - -#include -#include -#include - -#include -#include -#include -#include -#include -#include -#include -#include -#include - -using std::string; -using std::unique_ptr; - -namespace folly { - -// static members initializers -const AsyncSocket::OptionMap AsyncSocket::emptyOptionMap; -const folly::SocketAddress AsyncSocket::anyAddress = - folly::SocketAddress("0.0.0.0", 0); - -const AsyncSocketException socketClosedLocallyEx( - AsyncSocketException::END_OF_FILE, "socket closed locally"); -const AsyncSocketException socketShutdownForWritesEx( - AsyncSocketException::END_OF_FILE, "socket shutdown for writes"); - -// TODO: It might help performance to provide a version of WriteRequest that -// users could derive from, so we can avoid the extra allocation for each call -// to write()/writev(). We could templatize TFramedAsyncChannel just like the -// protocols are currently templatized for transports. -// -// We would need the version for external users where they provide the iovec -// storage space, and only our internal version would allocate it at the end of -// the WriteRequest. - -/** - * A WriteRequest object tracks information about a pending write() or writev() - * operation. - * - * A new WriteRequest operation is allocated on the heap for all write - * operations that cannot be completed immediately. - */ -class AsyncSocket::WriteRequest { - public: - static WriteRequest* newRequest(WriteCallback* callback, - const iovec* ops, - uint32_t opCount, - unique_ptr&& ioBuf, - WriteFlags flags) { - assert(opCount > 0); - // Since we put a variable size iovec array at the end - // of each WriteRequest, we have to manually allocate the memory. - void* buf = malloc(sizeof(WriteRequest) + - (opCount * sizeof(struct iovec))); - if (buf == nullptr) { - throw std::bad_alloc(); - } - - return new(buf) WriteRequest(callback, ops, opCount, std::move(ioBuf), - flags); - } - - void destroy() { - this->~WriteRequest(); - free(this); - } - - bool cork() const { - return isSet(flags_, WriteFlags::CORK); - } - - WriteFlags flags() const { - return flags_; - } - - WriteRequest* getNext() const { - return next_; - } - - WriteCallback* getCallback() const { - return callback_; - } - - uint32_t getBytesWritten() const { - return bytesWritten_; - } - - const struct iovec* getOps() const { - assert(opCount_ > opIndex_); - return writeOps_ + opIndex_; - } - - uint32_t getOpCount() const { - assert(opCount_ > opIndex_); - return opCount_ - opIndex_; - } - - void consume(uint32_t wholeOps, uint32_t partialBytes, - uint32_t totalBytesWritten) { - // Advance opIndex_ forward by wholeOps - opIndex_ += wholeOps; - assert(opIndex_ < opCount_); - - // If we've finished writing any IOBufs, release them - if (ioBuf_) { - for (uint32_t i = wholeOps; i != 0; --i) { - assert(ioBuf_); - ioBuf_ = ioBuf_->pop(); - } - } - - // Move partialBytes forward into the current iovec buffer - struct iovec* currentOp = writeOps_ + opIndex_; - assert((partialBytes < currentOp->iov_len) || (currentOp->iov_len == 0)); - currentOp->iov_base = - reinterpret_cast(currentOp->iov_base) + partialBytes; - currentOp->iov_len -= partialBytes; - - // Increment the bytesWritten_ count by totalBytesWritten - bytesWritten_ += totalBytesWritten; - } - - void append(WriteRequest* next) { - assert(next_ == nullptr); - next_ = next; - } - - private: - WriteRequest(WriteCallback* callback, - const struct iovec* ops, - uint32_t opCount, - unique_ptr&& ioBuf, - WriteFlags flags) - : next_(nullptr) - , callback_(callback) - , bytesWritten_(0) - , opCount_(opCount) - , opIndex_(0) - , flags_(flags) - , ioBuf_(std::move(ioBuf)) { - memcpy(writeOps_, ops, sizeof(*ops) * opCount_); - } - - // Private destructor, to ensure callers use destroy() - ~WriteRequest() {} - - WriteRequest* next_; ///< pointer to next WriteRequest - WriteCallback* callback_; ///< completion callback - uint32_t bytesWritten_; ///< bytes written - uint32_t opCount_; ///< number of entries in writeOps_ - uint32_t opIndex_; ///< current index into writeOps_ - WriteFlags flags_; ///< set for WriteFlags - unique_ptr ioBuf_; ///< underlying IOBuf, or nullptr if N/A - struct iovec writeOps_[]; ///< write operation(s) list -}; - -AsyncSocket::AsyncSocket(EventBase* evb) - : eventBase_(evb) - , writeTimeout_(this, evb) - , ioHandler_(this, evb) { - VLOG(5) << "new AsyncSocket(" << this << ", evb=" << evb << ")"; - init(); -} - -AsyncSocket::AsyncSocket(EventBase* evb, - const folly::SocketAddress& address, - uint32_t connectTimeout) - : eventBase_(evb) - , writeTimeout_(this, evb) - , ioHandler_(this, evb) { - VLOG(5) << "new AsyncSocket(" << this << ", evb=" << evb << ")"; - init(); - connect(nullptr, address, connectTimeout); -} - -AsyncSocket::AsyncSocket(EventBase* evb, - const std::string& ip, - uint16_t port, - uint32_t connectTimeout) - : eventBase_(evb) - , writeTimeout_(this, evb) - , ioHandler_(this, evb) { - VLOG(5) << "new AsyncSocket(" << this << ", evb=" << evb << ")"; - init(); - connect(nullptr, ip, port, connectTimeout); -} - -AsyncSocket::AsyncSocket(EventBase* evb, int fd) - : eventBase_(evb) - , writeTimeout_(this, evb) - , ioHandler_(this, evb, fd) { - VLOG(5) << "new AsyncSocket(" << this << ", evb=" << evb << ", fd=" - << fd << ")"; - init(); - fd_ = fd; - state_ = StateEnum::ESTABLISHED; -} - -// init() method, since constructor forwarding isn't supported in most -// compilers yet. -void AsyncSocket::init() { - assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread()); - shutdownFlags_ = 0; - state_ = StateEnum::UNINIT; - eventFlags_ = EventHandler::NONE; - fd_ = -1; - sendTimeout_ = 0; - maxReadsPerEvent_ = 0; - connectCallback_ = nullptr; - readCallback_ = nullptr; - writeReqHead_ = nullptr; - writeReqTail_ = nullptr; - shutdownSocketSet_ = nullptr; - appBytesWritten_ = 0; - appBytesReceived_ = 0; -} - -AsyncSocket::~AsyncSocket() { - VLOG(7) << "actual destruction of AsyncSocket(this=" << this - << ", evb=" << eventBase_ << ", fd=" << fd_ - << ", state=" << state_ << ")"; -} - -void AsyncSocket::destroy() { - VLOG(5) << "AsyncSocket::destroy(this=" << this << ", evb=" << eventBase_ - << ", fd=" << fd_ << ", state=" << state_; - // When destroy is called, close the socket immediately - closeNow(); - - // Then call DelayedDestruction::destroy() to take care of - // whether or not we need immediate or delayed destruction - DelayedDestruction::destroy(); -} - -int AsyncSocket::detachFd() { - VLOG(6) << "AsyncSocket::detachFd(this=" << this << ", fd=" << fd_ - << ", evb=" << eventBase_ << ", state=" << state_ - << ", events=" << std::hex << eventFlags_ << ")"; - // Extract the fd, and set fd_ to -1 first, so closeNow() won't - // actually close the descriptor. - if (shutdownSocketSet_) { - shutdownSocketSet_->remove(fd_); - } - int fd = fd_; - fd_ = -1; - // Call closeNow() to invoke all pending callbacks with an error. - closeNow(); - // Update the EventHandler to stop using this fd. - // This can only be done after closeNow() unregisters the handler. - ioHandler_.changeHandlerFD(-1); - return fd; -} - -void AsyncSocket::setShutdownSocketSet(ShutdownSocketSet* newSS) { - if (shutdownSocketSet_ == newSS) { - return; - } - if (shutdownSocketSet_ && fd_ != -1) { - shutdownSocketSet_->remove(fd_); - } - shutdownSocketSet_ = newSS; - if (shutdownSocketSet_ && fd_ != -1) { - shutdownSocketSet_->add(fd_); - } -} - -void AsyncSocket::connect(ConnectCallback* callback, - const folly::SocketAddress& address, - int timeout, - const OptionMap &options, - const folly::SocketAddress& bindAddr) noexcept { - DestructorGuard dg(this); - assert(eventBase_->isInEventBaseThread()); - - addr_ = address; - - // Make sure we're in the uninitialized state - if (state_ != StateEnum::UNINIT) { - return invalidState(callback); - } - - assert(fd_ == -1); - state_ = StateEnum::CONNECTING; - connectCallback_ = callback; - - sockaddr_storage addrStorage; - sockaddr* saddr = reinterpret_cast(&addrStorage); - - try { - // Create the socket - // Technically the first parameter should actually be a protocol family - // constant (PF_xxx) rather than an address family (AF_xxx), but the - // distinction is mainly just historical. In pretty much all - // implementations the PF_foo and AF_foo constants are identical. - fd_ = socket(address.getFamily(), SOCK_STREAM, 0); - if (fd_ < 0) { - throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR, - withAddr("failed to create socket"), errno); - } - if (shutdownSocketSet_) { - shutdownSocketSet_->add(fd_); - } - ioHandler_.changeHandlerFD(fd_); - - // Set the FD_CLOEXEC flag so that the socket will be closed if the program - // later forks and execs. - int rv = fcntl(fd_, F_SETFD, FD_CLOEXEC); - if (rv != 0) { - throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR, - withAddr("failed to set close-on-exec flag"), - errno); - } - - // Put the socket in non-blocking mode - int flags = fcntl(fd_, F_GETFL, 0); - if (flags == -1) { - throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR, - withAddr("failed to get socket flags"), errno); - } - rv = fcntl(fd_, F_SETFL, flags | O_NONBLOCK); - if (rv == -1) { - throw AsyncSocketException( - AsyncSocketException::INTERNAL_ERROR, - withAddr("failed to put socket in non-blocking mode"), - errno); - } - -#if !defined(MSG_NOSIGNAL) && defined(F_SETNOSIGPIPE) - // iOS and OS X don't support MSG_NOSIGNAL; set F_SETNOSIGPIPE instead - rv = fcntl(fd_, F_SETNOSIGPIPE, 1); - if (rv == -1) { - throw AsyncSocketException( - AsyncSocketException::INTERNAL_ERROR, - "failed to enable F_SETNOSIGPIPE on socket", - errno); - } -#endif - - // By default, turn on TCP_NODELAY - // If setNoDelay() fails, we continue anyway; this isn't a fatal error. - // setNoDelay() will log an error message if it fails. - if (address.getFamily() != AF_UNIX) { - (void)setNoDelay(true); - } - - VLOG(5) << "AsyncSocket::connect(this=" << this << ", evb=" << eventBase_ - << ", fd=" << fd_ << ", host=" << address.describe().c_str(); - - // bind the socket - if (bindAddr != anyAddress) { - int one = 1; - if (::setsockopt(fd_, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one))) { - doClose(); - throw AsyncSocketException( - AsyncSocketException::NOT_OPEN, - "failed to setsockopt prior to bind on " + bindAddr.describe(), - errno); - } - - bindAddr.getAddress(&addrStorage); - - if (::bind(fd_, saddr, bindAddr.getActualSize()) != 0) { - doClose(); - throw AsyncSocketException(AsyncSocketException::NOT_OPEN, - "failed to bind to async socket: " + - bindAddr.describe(), - errno); - } - } - - // Apply the additional options if any. - for (const auto& opt: options) { - int rv = opt.first.apply(fd_, opt.second); - if (rv != 0) { - throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR, - withAddr("failed to set socket option"), - errno); - } - } - - // Perform the connect() - address.getAddress(&addrStorage); - - rv = ::connect(fd_, saddr, address.getActualSize()); - if (rv < 0) { - if (errno == EINPROGRESS) { - // Connection in progress. - if (timeout > 0) { - // Start a timer in case the connection takes too long. - if (!writeTimeout_.scheduleTimeout(timeout)) { - throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR, - withAddr("failed to schedule AsyncSocket connect timeout")); - } - } - - // Register for write events, so we'll - // be notified when the connection finishes/fails. - // Note that we don't register for a persistent event here. - assert(eventFlags_ == EventHandler::NONE); - eventFlags_ = EventHandler::WRITE; - if (!ioHandler_.registerHandler(eventFlags_)) { - throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR, - withAddr("failed to register AsyncSocket connect handler")); - } - return; - } else { - throw AsyncSocketException(AsyncSocketException::NOT_OPEN, - "connect failed (immediately)", errno); - } - } - - // If we're still here the connect() succeeded immediately. - // Fall through to call the callback outside of this try...catch block - } catch (const AsyncSocketException& ex) { - return failConnect(__func__, ex); - } catch (const std::exception& ex) { - // shouldn't happen, but handle it just in case - VLOG(4) << "AsyncSocket::connect(this=" << this << ", fd=" << fd_ - << "): unexpected " << typeid(ex).name() << " exception: " - << ex.what(); - AsyncSocketException tex(AsyncSocketException::INTERNAL_ERROR, - withAddr(string("unexpected exception: ") + - ex.what())); - return failConnect(__func__, tex); - } - - // The connection succeeded immediately - // The read callback may not have been set yet, and no writes may be pending - // yet, so we don't have to register for any events at the moment. - VLOG(8) << "AsyncSocket::connect succeeded immediately; this=" << this; - assert(readCallback_ == nullptr); - assert(writeReqHead_ == nullptr); - state_ = StateEnum::ESTABLISHED; - if (callback) { - connectCallback_ = nullptr; - callback->connectSuccess(); - } -} - -void AsyncSocket::connect(ConnectCallback* callback, - const string& ip, uint16_t port, - int timeout, - const OptionMap &options) noexcept { - DestructorGuard dg(this); - try { - connectCallback_ = callback; - connect(callback, folly::SocketAddress(ip, port), timeout, options); - } catch (const std::exception& ex) { - AsyncSocketException tex(AsyncSocketException::INTERNAL_ERROR, - ex.what()); - return failConnect(__func__, tex); - } -} - -void AsyncSocket::setSendTimeout(uint32_t milliseconds) { - sendTimeout_ = milliseconds; - assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread()); - - // If we are currently pending on write requests, immediately update - // writeTimeout_ with the new value. - if ((eventFlags_ & EventHandler::WRITE) && - (state_ != StateEnum::CONNECTING)) { - assert(state_ == StateEnum::ESTABLISHED); - assert((shutdownFlags_ & SHUT_WRITE) == 0); - if (sendTimeout_ > 0) { - if (!writeTimeout_.scheduleTimeout(sendTimeout_)) { - AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR, - withAddr("failed to reschedule send timeout in setSendTimeout")); - return failWrite(__func__, ex); - } - } else { - writeTimeout_.cancelTimeout(); - } - } -} - -void AsyncSocket::setReadCB(ReadCallback *callback) { - VLOG(6) << "AsyncSocket::setReadCallback() this=" << this << ", fd=" << fd_ - << ", callback=" << callback << ", state=" << state_; - - // Short circuit if callback is the same as the existing readCallback_. - // - // Note that this is needed for proper functioning during some cleanup cases. - // During cleanup we allow setReadCallback(nullptr) to be called even if the - // read callback is already unset and we have been detached from an event - // base. This check prevents us from asserting - // eventBase_->isInEventBaseThread() when eventBase_ is nullptr. - if (callback == readCallback_) { - return; - } - - if (shutdownFlags_ & SHUT_READ) { - // Reads have already been shut down on this socket. - // - // Allow setReadCallback(nullptr) to be called in this case, but don't - // allow a new callback to be set. - // - // For example, setReadCallback(nullptr) can happen after an error if we - // invoke some other error callback before invoking readError(). The other - // error callback that is invoked first may go ahead and clear the read - // callback before we get a chance to invoke readError(). - if (callback != nullptr) { - return invalidState(callback); - } - assert((eventFlags_ & EventHandler::READ) == 0); - readCallback_ = nullptr; - return; - } - - DestructorGuard dg(this); - assert(eventBase_->isInEventBaseThread()); - - switch ((StateEnum)state_) { - case StateEnum::CONNECTING: - // For convenience, we allow the read callback to be set while we are - // still connecting. We just store the callback for now. Once the - // connection completes we'll register for read events. - readCallback_ = callback; - return; - case StateEnum::ESTABLISHED: - { - readCallback_ = callback; - uint16_t oldFlags = eventFlags_; - if (readCallback_) { - eventFlags_ |= EventHandler::READ; - } else { - eventFlags_ &= ~EventHandler::READ; - } - - // Update our registration if our flags have changed - if (eventFlags_ != oldFlags) { - // We intentionally ignore the return value here. - // updateEventRegistration() will move us into the error state if it - // fails, and we don't need to do anything else here afterwards. - (void)updateEventRegistration(); - } - - if (readCallback_) { - checkForImmediateRead(); - } - return; - } - case StateEnum::CLOSED: - case StateEnum::ERROR: - // We should never reach here. SHUT_READ should always be set - // if we are in STATE_CLOSED or STATE_ERROR. - assert(false); - return invalidState(callback); - case StateEnum::UNINIT: - // We do not allow setReadCallback() to be called before we start - // connecting. - return invalidState(callback); - } - - // We don't put a default case in the switch statement, so that the compiler - // will warn us to update the switch statement if a new state is added. - return invalidState(callback); -} - -AsyncSocket::ReadCallback* AsyncSocket::getReadCallback() const { - return readCallback_; -} - -void AsyncSocket::write(WriteCallback* callback, - const void* buf, size_t bytes, WriteFlags flags) { - iovec op; - op.iov_base = const_cast(buf); - op.iov_len = bytes; - writeImpl(callback, &op, 1, std::move(unique_ptr()), flags); -} - -void AsyncSocket::writev(WriteCallback* callback, - const iovec* vec, - size_t count, - WriteFlags flags) { - writeImpl(callback, vec, count, std::move(unique_ptr()), flags); -} - -void AsyncSocket::writeChain(WriteCallback* callback, unique_ptr&& buf, - WriteFlags flags) { - size_t count = buf->countChainElements(); - if (count <= 64) { - iovec vec[count]; - writeChainImpl(callback, vec, count, std::move(buf), flags); - } else { - iovec* vec = new iovec[count]; - writeChainImpl(callback, vec, count, std::move(buf), flags); - delete[] vec; - } -} - -void AsyncSocket::writeChainImpl(WriteCallback* callback, iovec* vec, - size_t count, unique_ptr&& buf, WriteFlags flags) { - const IOBuf* head = buf.get(); - const IOBuf* next = head; - unsigned i = 0; - do { - vec[i].iov_base = const_cast(next->data()); - vec[i].iov_len = next->length(); - // IOBuf can get confused by empty iovec buffers, so increment the - // output pointer only if the iovec buffer is non-empty. We could - // end the loop with i < count, but that's ok. - if (vec[i].iov_len != 0) { - i++; - } - next = next->next(); - } while (next != head); - writeImpl(callback, vec, i, std::move(buf), flags); -} - -void AsyncSocket::writeImpl(WriteCallback* callback, const iovec* vec, - size_t count, unique_ptr&& buf, - WriteFlags flags) { - VLOG(6) << "AsyncSocket::writev() this=" << this << ", fd=" << fd_ - << ", callback=" << callback << ", count=" << count - << ", state=" << state_; - DestructorGuard dg(this); - unique_ptrioBuf(std::move(buf)); - assert(eventBase_->isInEventBaseThread()); - - if (shutdownFlags_ & (SHUT_WRITE | SHUT_WRITE_PENDING)) { - // No new writes may be performed after the write side of the socket has - // been shutdown. - // - // We could just call callback->writeError() here to fail just this write. - // However, fail hard and use invalidState() to fail all outstanding - // callbacks and move the socket into the error state. There's most likely - // a bug in the caller's code, so we abort everything rather than trying to - // proceed as best we can. - return invalidState(callback); - } - - uint32_t countWritten = 0; - uint32_t partialWritten = 0; - int bytesWritten = 0; - bool mustRegister = false; - if (state_ == StateEnum::ESTABLISHED && !connecting()) { - if (writeReqHead_ == nullptr) { - // If we are established and there are no other writes pending, - // we can attempt to perform the write immediately. - assert(writeReqTail_ == nullptr); - assert((eventFlags_ & EventHandler::WRITE) == 0); - - bytesWritten = performWrite(vec, count, flags, - &countWritten, &partialWritten); - if (bytesWritten < 0) { - AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR, - withAddr("writev failed"), errno); - return failWrite(__func__, callback, 0, ex); - } else if (countWritten == count) { - // We successfully wrote everything. - // Invoke the callback and return. - if (callback) { - callback->writeSuccess(); - } - return; - } // else { continue writing the next writeReq } - mustRegister = true; - } - } else if (!connecting()) { - // Invalid state for writing - return invalidState(callback); - } - - // Create a new WriteRequest to add to the queue - WriteRequest* req; - try { - req = WriteRequest::newRequest(callback, vec + countWritten, - count - countWritten, std::move(ioBuf), - flags); - } catch (const std::exception& ex) { - // we mainly expect to catch std::bad_alloc here - AsyncSocketException tex(AsyncSocketException::INTERNAL_ERROR, - withAddr(string("failed to append new WriteRequest: ") + ex.what())); - return failWrite(__func__, callback, bytesWritten, tex); - } - req->consume(0, partialWritten, bytesWritten); - if (writeReqTail_ == nullptr) { - assert(writeReqHead_ == nullptr); - writeReqHead_ = writeReqTail_ = req; - } else { - writeReqTail_->append(req); - writeReqTail_ = req; - } - - // Register for write events if are established and not currently - // waiting on write events - if (mustRegister) { - assert(state_ == StateEnum::ESTABLISHED); - assert((eventFlags_ & EventHandler::WRITE) == 0); - if (!updateEventRegistration(EventHandler::WRITE, 0)) { - assert(state_ == StateEnum::ERROR); - return; - } - if (sendTimeout_ > 0) { - // Schedule a timeout to fire if the write takes too long. - if (!writeTimeout_.scheduleTimeout(sendTimeout_)) { - AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR, - withAddr("failed to schedule send timeout")); - return failWrite(__func__, ex); - } - } - } -} - -void AsyncSocket::close() { - VLOG(5) << "AsyncSocket::close(): this=" << this << ", fd_=" << fd_ - << ", state=" << state_ << ", shutdownFlags=" - << std::hex << (int) shutdownFlags_; - - // close() is only different from closeNow() when there are pending writes - // that need to drain before we can close. In all other cases, just call - // closeNow(). - // - // Note that writeReqHead_ can be non-nullptr even in STATE_CLOSED or - // STATE_ERROR if close() is invoked while a previous closeNow() or failure - // is still running. (e.g., If there are multiple pending writes, and we - // call writeError() on the first one, it may call close(). In this case we - // will already be in STATE_CLOSED or STATE_ERROR, but the remaining pending - // writes will still be in the queue.) - // - // We only need to drain pending writes if we are still in STATE_CONNECTING - // or STATE_ESTABLISHED - if ((writeReqHead_ == nullptr) || - !(state_ == StateEnum::CONNECTING || - state_ == StateEnum::ESTABLISHED)) { - closeNow(); - return; - } - - // Declare a DestructorGuard to ensure that the AsyncSocket cannot be - // destroyed until close() returns. - DestructorGuard dg(this); - assert(eventBase_->isInEventBaseThread()); - - // Since there are write requests pending, we have to set the - // SHUT_WRITE_PENDING flag, and wait to perform the real close until the - // connect finishes and we finish writing these requests. - // - // Set SHUT_READ to indicate that reads are shut down, and set the - // SHUT_WRITE_PENDING flag to mark that we want to shutdown once the - // pending writes complete. - shutdownFlags_ |= (SHUT_READ | SHUT_WRITE_PENDING); - - // If a read callback is set, invoke readEOF() immediately to inform it that - // the socket has been closed and no more data can be read. - if (readCallback_) { - // Disable reads if they are enabled - if (!updateEventRegistration(0, EventHandler::READ)) { - // We're now in the error state; callbacks have been cleaned up - assert(state_ == StateEnum::ERROR); - assert(readCallback_ == nullptr); - } else { - ReadCallback* callback = readCallback_; - readCallback_ = nullptr; - callback->readEOF(); - } - } -} - -void AsyncSocket::closeNow() { - VLOG(5) << "AsyncSocket::closeNow(): this=" << this << ", fd_=" << fd_ - << ", state=" << state_ << ", shutdownFlags=" - << std::hex << (int) shutdownFlags_; - DestructorGuard dg(this); - assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread()); - - switch (state_) { - case StateEnum::ESTABLISHED: - case StateEnum::CONNECTING: - { - shutdownFlags_ |= (SHUT_READ | SHUT_WRITE); - state_ = StateEnum::CLOSED; - - // If the write timeout was set, cancel it. - writeTimeout_.cancelTimeout(); - - // If we are registered for I/O events, unregister. - if (eventFlags_ != EventHandler::NONE) { - eventFlags_ = EventHandler::NONE; - if (!updateEventRegistration()) { - // We will have been moved into the error state. - assert(state_ == StateEnum::ERROR); - return; - } - } - - if (fd_ >= 0) { - ioHandler_.changeHandlerFD(-1); - doClose(); - } - - if (connectCallback_) { - ConnectCallback* callback = connectCallback_; - connectCallback_ = nullptr; - callback->connectErr(socketClosedLocallyEx); - } - - failAllWrites(socketClosedLocallyEx); - - if (readCallback_) { - ReadCallback* callback = readCallback_; - readCallback_ = nullptr; - callback->readEOF(); - } - return; - } - case StateEnum::CLOSED: - // Do nothing. It's possible that we are being called recursively - // from inside a callback that we invoked inside another call to close() - // that is still running. - return; - case StateEnum::ERROR: - // Do nothing. The error handling code has performed (or is performing) - // cleanup. - return; - case StateEnum::UNINIT: - assert(eventFlags_ == EventHandler::NONE); - assert(connectCallback_ == nullptr); - assert(readCallback_ == nullptr); - assert(writeReqHead_ == nullptr); - shutdownFlags_ |= (SHUT_READ | SHUT_WRITE); - state_ = StateEnum::CLOSED; - return; - } - - LOG(DFATAL) << "AsyncSocket::closeNow() (this=" << this << ", fd=" << fd_ - << ") called in unknown state " << state_; -} - -void AsyncSocket::closeWithReset() { - // Enable SO_LINGER, with the linger timeout set to 0. - // This will trigger a TCP reset when we close the socket. - if (fd_ >= 0) { - struct linger optLinger = {1, 0}; - if (setSockOpt(SOL_SOCKET, SO_LINGER, &optLinger) != 0) { - VLOG(2) << "AsyncSocket::closeWithReset(): error setting SO_LINGER " - << "on " << fd_ << ": errno=" << errno; - } - } - - // Then let closeNow() take care of the rest - closeNow(); -} - -void AsyncSocket::shutdownWrite() { - VLOG(5) << "AsyncSocket::shutdownWrite(): this=" << this << ", fd=" << fd_ - << ", state=" << state_ << ", shutdownFlags=" - << std::hex << (int) shutdownFlags_; - - // If there are no pending writes, shutdownWrite() is identical to - // shutdownWriteNow(). - if (writeReqHead_ == nullptr) { - shutdownWriteNow(); - return; - } - - assert(eventBase_->isInEventBaseThread()); - - // There are pending writes. Set SHUT_WRITE_PENDING so that the actual - // shutdown will be performed once all writes complete. - shutdownFlags_ |= SHUT_WRITE_PENDING; -} - -void AsyncSocket::shutdownWriteNow() { - VLOG(5) << "AsyncSocket::shutdownWriteNow(): this=" << this - << ", fd=" << fd_ << ", state=" << state_ - << ", shutdownFlags=" << std::hex << (int) shutdownFlags_; - - if (shutdownFlags_ & SHUT_WRITE) { - // Writes are already shutdown; nothing else to do. - return; - } - - // If SHUT_READ is already set, just call closeNow() to completely - // close the socket. This can happen if close() was called with writes - // pending, and then shutdownWriteNow() is called before all pending writes - // complete. - if (shutdownFlags_ & SHUT_READ) { - closeNow(); - return; - } - - DestructorGuard dg(this); - assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread()); - - switch (static_cast(state_)) { - case StateEnum::ESTABLISHED: - { - shutdownFlags_ |= SHUT_WRITE; - - // If the write timeout was set, cancel it. - writeTimeout_.cancelTimeout(); - - // If we are registered for write events, unregister. - if (!updateEventRegistration(0, EventHandler::WRITE)) { - // We will have been moved into the error state. - assert(state_ == StateEnum::ERROR); - return; - } - - // Shutdown writes on the file descriptor - ::shutdown(fd_, SHUT_WR); - - // Immediately fail all write requests - failAllWrites(socketShutdownForWritesEx); - return; - } - case StateEnum::CONNECTING: - { - // Set the SHUT_WRITE_PENDING flag. - // When the connection completes, it will check this flag, - // shutdown the write half of the socket, and then set SHUT_WRITE. - shutdownFlags_ |= SHUT_WRITE_PENDING; - - // Immediately fail all write requests - failAllWrites(socketShutdownForWritesEx); - return; - } - case StateEnum::UNINIT: - // Callers normally shouldn't call shutdownWriteNow() before the socket - // even starts connecting. Nonetheless, go ahead and set - // SHUT_WRITE_PENDING. Once the socket eventually connects it will - // immediately shut down the write side of the socket. - shutdownFlags_ |= SHUT_WRITE_PENDING; - return; - case StateEnum::CLOSED: - case StateEnum::ERROR: - // We should never get here. SHUT_WRITE should always be set - // in STATE_CLOSED and STATE_ERROR. - VLOG(4) << "AsyncSocket::shutdownWriteNow() (this=" << this - << ", fd=" << fd_ << ") in unexpected state " << state_ - << " with SHUT_WRITE not set (" - << std::hex << (int) shutdownFlags_ << ")"; - assert(false); - return; - } - - LOG(DFATAL) << "AsyncSocket::shutdownWriteNow() (this=" << this << ", fd=" - << fd_ << ") called in unknown state " << state_; -} - -bool AsyncSocket::readable() const { - if (fd_ == -1) { - return false; - } - struct pollfd fds[1]; - fds[0].fd = fd_; - fds[0].events = POLLIN; - fds[0].revents = 0; - int rc = poll(fds, 1, 0); - return rc == 1; -} - -bool AsyncSocket::isPending() const { - return ioHandler_.isPending(); -} - -bool AsyncSocket::hangup() const { - if (fd_ == -1) { - // sanity check, no one should ask for hangup if we are not connected. - assert(false); - return false; - } -#ifdef POLLRDHUP // Linux-only - struct pollfd fds[1]; - fds[0].fd = fd_; - fds[0].events = POLLRDHUP|POLLHUP; - fds[0].revents = 0; - poll(fds, 1, 0); - return (fds[0].revents & (POLLRDHUP|POLLHUP)) != 0; -#else - return false; -#endif -} - -bool AsyncSocket::good() const { - return ((state_ == StateEnum::CONNECTING || - state_ == StateEnum::ESTABLISHED) && - (shutdownFlags_ == 0) && (eventBase_ != nullptr)); -} - -bool AsyncSocket::error() const { - return (state_ == StateEnum::ERROR); -} - -void AsyncSocket::attachEventBase(EventBase* eventBase) { - VLOG(5) << "AsyncSocket::attachEventBase(this=" << this << ", fd=" << fd_ - << ", old evb=" << eventBase_ << ", new evb=" << eventBase - << ", state=" << state_ << ", events=" - << std::hex << eventFlags_ << ")"; - assert(eventBase_ == nullptr); - assert(eventBase->isInEventBaseThread()); - - eventBase_ = eventBase; - ioHandler_.attachEventBase(eventBase); - writeTimeout_.attachEventBase(eventBase); -} - -void AsyncSocket::detachEventBase() { - VLOG(5) << "AsyncSocket::detachEventBase(this=" << this << ", fd=" << fd_ - << ", old evb=" << eventBase_ << ", state=" << state_ - << ", events=" << std::hex << eventFlags_ << ")"; - assert(eventBase_ != nullptr); - assert(eventBase_->isInEventBaseThread()); - - eventBase_ = nullptr; - ioHandler_.detachEventBase(); - writeTimeout_.detachEventBase(); -} - -bool AsyncSocket::isDetachable() const { - DCHECK(eventBase_ != nullptr); - DCHECK(eventBase_->isInEventBaseThread()); - - return !ioHandler_.isHandlerRegistered() && !writeTimeout_.isScheduled(); -} - -void AsyncSocket::getLocalAddress(folly::SocketAddress* address) const { - address->setFromLocalAddress(fd_); -} - -void AsyncSocket::getPeerAddress(folly::SocketAddress* address) const { - if (!addr_.isInitialized()) { - addr_.setFromPeerAddress(fd_); - } - *address = addr_; -} - -int AsyncSocket::setNoDelay(bool noDelay) { - if (fd_ < 0) { - VLOG(4) << "AsyncSocket::setNoDelay() called on non-open socket " - << this << "(state=" << state_ << ")"; - return EINVAL; - - } - - int value = noDelay ? 1 : 0; - if (setsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &value, sizeof(value)) != 0) { - int errnoCopy = errno; - VLOG(2) << "failed to update TCP_NODELAY option on AsyncSocket " - << this << " (fd=" << fd_ << ", state=" << state_ << "): " - << strerror(errnoCopy); - return errnoCopy; - } - - return 0; -} - -int AsyncSocket::setCongestionFlavor(const std::string &cname) { - - #ifndef TCP_CONGESTION - #define TCP_CONGESTION 13 - #endif - - if (fd_ < 0) { - VLOG(4) << "AsyncSocket::setCongestionFlavor() called on non-open " - << "socket " << this << "(state=" << state_ << ")"; - return EINVAL; - - } - - if (setsockopt(fd_, IPPROTO_TCP, TCP_CONGESTION, cname.c_str(), - cname.length() + 1) != 0) { - int errnoCopy = errno; - VLOG(2) << "failed to update TCP_CONGESTION option on AsyncSocket " - << this << "(fd=" << fd_ << ", state=" << state_ << "): " - << strerror(errnoCopy); - return errnoCopy; - } - - return 0; -} - -int AsyncSocket::setQuickAck(bool quickack) { - if (fd_ < 0) { - VLOG(4) << "AsyncSocket::setQuickAck() called on non-open socket " - << this << "(state=" << state_ << ")"; - return EINVAL; - - } - -#ifdef TCP_QUICKACK // Linux-only - int value = quickack ? 1 : 0; - if (setsockopt(fd_, IPPROTO_TCP, TCP_QUICKACK, &value, sizeof(value)) != 0) { - int errnoCopy = errno; - VLOG(2) << "failed to update TCP_QUICKACK option on AsyncSocket" - << this << "(fd=" << fd_ << ", state=" << state_ << "): " - << strerror(errnoCopy); - return errnoCopy; - } - - return 0; -#else - return ENOSYS; -#endif -} - -int AsyncSocket::setSendBufSize(size_t bufsize) { - if (fd_ < 0) { - VLOG(4) << "AsyncSocket::setSendBufSize() called on non-open socket " - << this << "(state=" << state_ << ")"; - return EINVAL; - } - - if (setsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &bufsize, sizeof(bufsize)) !=0) { - int errnoCopy = errno; - VLOG(2) << "failed to update SO_SNDBUF option on AsyncSocket" - << this << "(fd=" << fd_ << ", state=" << state_ << "): " - << strerror(errnoCopy); - return errnoCopy; - } - - return 0; -} - -int AsyncSocket::setRecvBufSize(size_t bufsize) { - if (fd_ < 0) { - VLOG(4) << "AsyncSocket::setRecvBufSize() called on non-open socket " - << this << "(state=" << state_ << ")"; - return EINVAL; - } - - if (setsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &bufsize, sizeof(bufsize)) !=0) { - int errnoCopy = errno; - VLOG(2) << "failed to update SO_RCVBUF option on AsyncSocket" - << this << "(fd=" << fd_ << ", state=" << state_ << "): " - << strerror(errnoCopy); - return errnoCopy; - } - - return 0; -} - -int AsyncSocket::setTCPProfile(int profd) { - if (fd_ < 0) { - VLOG(4) << "AsyncSocket::setTCPProfile() called on non-open socket " - << this << "(state=" << state_ << ")"; - return EINVAL; - } - - if (setsockopt(fd_, SOL_SOCKET, SO_SET_NAMESPACE, &profd, sizeof(int)) !=0) { - int errnoCopy = errno; - VLOG(2) << "failed to set socket namespace option on AsyncSocket" - << this << "(fd=" << fd_ << ", state=" << state_ << "): " - << strerror(errnoCopy); - return errnoCopy; - } - - return 0; -} - -void AsyncSocket::ioReady(uint16_t events) noexcept { - VLOG(7) << "AsyncSocket::ioRead() this=" << this << ", fd" << fd_ - << ", events=" << std::hex << events << ", state=" << state_; - DestructorGuard dg(this); - assert(events & EventHandler::READ_WRITE); - assert(eventBase_->isInEventBaseThread()); - - uint16_t relevantEvents = events & EventHandler::READ_WRITE; - if (relevantEvents == EventHandler::READ) { - handleRead(); - } else if (relevantEvents == EventHandler::WRITE) { - handleWrite(); - } else if (relevantEvents == EventHandler::READ_WRITE) { - EventBase* originalEventBase = eventBase_; - // If both read and write events are ready, process writes first. - handleWrite(); - - // Return now if handleWrite() detached us from our EventBase - if (eventBase_ != originalEventBase) { - return; - } - - // Only call handleRead() if a read callback is still installed. - // (It's possible that the read callback was uninstalled during - // handleWrite().) - if (readCallback_) { - handleRead(); - } - } else { - VLOG(4) << "AsyncSocket::ioRead() called with unexpected events " - << std::hex << events << "(this=" << this << ")"; - abort(); - } -} - -ssize_t AsyncSocket::performRead(void* buf, size_t buflen) { - ssize_t bytes = recv(fd_, buf, buflen, MSG_DONTWAIT); - if (bytes < 0) { - if (errno == EAGAIN || errno == EWOULDBLOCK) { - // No more data to read right now. - return READ_BLOCKING; - } else { - return READ_ERROR; - } - } else { - appBytesReceived_ += bytes; - return bytes; - } -} - -void AsyncSocket::handleRead() noexcept { - VLOG(5) << "AsyncSocket::handleRead() this=" << this << ", fd=" << fd_ - << ", state=" << state_; - assert(state_ == StateEnum::ESTABLISHED); - assert((shutdownFlags_ & SHUT_READ) == 0); - assert(readCallback_ != nullptr); - assert(eventFlags_ & EventHandler::READ); - - // Loop until: - // - a read attempt would block - // - readCallback_ is uninstalled - // - the number of loop iterations exceeds the optional maximum - // - this AsyncSocket is moved to another EventBase - // - // When we invoke readDataAvailable() it may uninstall the readCallback_, - // which is why need to check for it here. - // - // The last bullet point is slightly subtle. readDataAvailable() may also - // detach this socket from this EventBase. However, before - // readDataAvailable() returns another thread may pick it up, attach it to - // a different EventBase, and install another readCallback_. We need to - // exit immediately after readDataAvailable() returns if the eventBase_ has - // changed. (The caller must perform some sort of locking to transfer the - // AsyncSocket between threads properly. This will be sufficient to ensure - // that this thread sees the updated eventBase_ variable after - // readDataAvailable() returns.) - uint16_t numReads = 0; - EventBase* originalEventBase = eventBase_; - while (readCallback_ && eventBase_ == originalEventBase) { - // Get the buffer to read into. - void* buf = nullptr; - size_t buflen = 0; - try { - readCallback_->getReadBuffer(&buf, &buflen); - } catch (const AsyncSocketException& ex) { - return failRead(__func__, ex); - } catch (const std::exception& ex) { - AsyncSocketException tex(AsyncSocketException::BAD_ARGS, - string("ReadCallback::getReadBuffer() " - "threw exception: ") + - ex.what()); - return failRead(__func__, tex); - } catch (...) { - AsyncSocketException ex(AsyncSocketException::BAD_ARGS, - "ReadCallback::getReadBuffer() threw " - "non-exception type"); - return failRead(__func__, ex); - } - if (buf == nullptr || buflen == 0) { - AsyncSocketException ex(AsyncSocketException::BAD_ARGS, - "ReadCallback::getReadBuffer() returned " - "empty buffer"); - return failRead(__func__, ex); - } - - // Perform the read - ssize_t bytesRead = performRead(buf, buflen); - if (bytesRead > 0) { - readCallback_->readDataAvailable(bytesRead); - // Fall through and continue around the loop if the read - // completely filled the available buffer. - // Note that readCallback_ may have been uninstalled or changed inside - // readDataAvailable(). - if (bytesRead < buflen) { - return; - } - } else if (bytesRead == READ_BLOCKING) { - // No more data to read right now. - return; - } else if (bytesRead == READ_ERROR) { - AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR, - withAddr("recv() failed"), errno); - return failRead(__func__, ex); - } else { - assert(bytesRead == READ_EOF); - // EOF - shutdownFlags_ |= SHUT_READ; - if (!updateEventRegistration(0, EventHandler::READ)) { - // we've already been moved into STATE_ERROR - assert(state_ == StateEnum::ERROR); - assert(readCallback_ == nullptr); - return; - } - - ReadCallback* callback = readCallback_; - readCallback_ = nullptr; - callback->readEOF(); - return; - } - if (maxReadsPerEvent_ && (++numReads >= maxReadsPerEvent_)) { - return; - } - } -} - -/** - * This function attempts to write as much data as possible, until no more data - * can be written. - * - * - If it sends all available data, it unregisters for write events, and stops - * the writeTimeout_. - * - * - If not all of the data can be sent immediately, it reschedules - * writeTimeout_ (if a non-zero timeout is set), and ensures the handler is - * registered for write events. - */ -void AsyncSocket::handleWrite() noexcept { - VLOG(5) << "AsyncSocket::handleWrite() this=" << this << ", fd=" << fd_ - << ", state=" << state_; - if (state_ == StateEnum::CONNECTING) { - handleConnect(); - return; - } - - // Normal write - assert(state_ == StateEnum::ESTABLISHED); - assert((shutdownFlags_ & SHUT_WRITE) == 0); - assert(writeReqHead_ != nullptr); - - // Loop until we run out of write requests, - // or until this socket is moved to another EventBase. - // (See the comment in handleRead() explaining how this can happen.) - EventBase* originalEventBase = eventBase_; - while (writeReqHead_ != nullptr && eventBase_ == originalEventBase) { - uint32_t countWritten; - uint32_t partialWritten; - WriteFlags writeFlags = writeReqHead_->flags(); - if (writeReqHead_->getNext() != nullptr) { - writeFlags = writeFlags | WriteFlags::CORK; - } - int bytesWritten = performWrite(writeReqHead_->getOps(), - writeReqHead_->getOpCount(), - writeFlags, &countWritten, &partialWritten); - if (bytesWritten < 0) { - AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR, - withAddr("writev() failed"), errno); - return failWrite(__func__, ex); - } else if (countWritten == writeReqHead_->getOpCount()) { - // We finished this request - WriteRequest* req = writeReqHead_; - writeReqHead_ = req->getNext(); - - if (writeReqHead_ == nullptr) { - writeReqTail_ = nullptr; - // This is the last write request. - // Unregister for write events and cancel the send timer - // before we invoke the callback. We have to update the state properly - // before calling the callback, since it may want to detach us from - // the EventBase. - if (eventFlags_ & EventHandler::WRITE) { - if (!updateEventRegistration(0, EventHandler::WRITE)) { - assert(state_ == StateEnum::ERROR); - return; - } - // Stop the send timeout - writeTimeout_.cancelTimeout(); - } - assert(!writeTimeout_.isScheduled()); - - // If SHUT_WRITE_PENDING is set, we should shutdown the socket after - // we finish sending the last write request. - // - // We have to do this before invoking writeSuccess(), since - // writeSuccess() may detach us from our EventBase. - if (shutdownFlags_ & SHUT_WRITE_PENDING) { - assert(connectCallback_ == nullptr); - shutdownFlags_ |= SHUT_WRITE; - - if (shutdownFlags_ & SHUT_READ) { - // Reads have already been shutdown. Fully close the socket and - // move to STATE_CLOSED. - // - // Note: This code currently moves us to STATE_CLOSED even if - // close() hasn't ever been called. This can occur if we have - // received EOF from the peer and shutdownWrite() has been called - // locally. Should we bother staying in STATE_ESTABLISHED in this - // case, until close() is actually called? I can't think of a - // reason why we would need to do so. No other operations besides - // calling close() or destroying the socket can be performed at - // this point. - assert(readCallback_ == nullptr); - state_ = StateEnum::CLOSED; - if (fd_ >= 0) { - ioHandler_.changeHandlerFD(-1); - doClose(); - } - } else { - // Reads are still enabled, so we are only doing a half-shutdown - ::shutdown(fd_, SHUT_WR); - } - } - } - - // Invoke the callback - WriteCallback* callback = req->getCallback(); - req->destroy(); - if (callback) { - callback->writeSuccess(); - } - // We'll continue around the loop, trying to write another request - } else { - // Partial write. - writeReqHead_->consume(countWritten, partialWritten, bytesWritten); - // Stop after a partial write; it's highly likely that a subsequent write - // attempt will just return EAGAIN. - // - // Ensure that we are registered for write events. - if ((eventFlags_ & EventHandler::WRITE) == 0) { - if (!updateEventRegistration(EventHandler::WRITE, 0)) { - assert(state_ == StateEnum::ERROR); - return; - } - } - - // Reschedule the send timeout, since we have made some write progress. - if (sendTimeout_ > 0) { - if (!writeTimeout_.scheduleTimeout(sendTimeout_)) { - AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR, - withAddr("failed to reschedule write timeout")); - return failWrite(__func__, ex); - } - } - return; - } - } -} - -void AsyncSocket::checkForImmediateRead() noexcept { - // We currently don't attempt to perform optimistic reads in AsyncSocket. - // (However, note that some subclasses do override this method.) - // - // Simply calling handleRead() here would be bad, as this would call - // readCallback_->getReadBuffer(), forcing the callback to allocate a read - // buffer even though no data may be available. This would waste lots of - // memory, since the buffer will sit around unused until the socket actually - // becomes readable. - // - // Checking if the socket is readable now also seems like it would probably - // be a pessimism. In most cases it probably wouldn't be readable, and we - // would just waste an extra system call. Even if it is readable, waiting to - // find out from libevent on the next event loop doesn't seem that bad. -} - -void AsyncSocket::handleInitialReadWrite() noexcept { - // Our callers should already be holding a DestructorGuard, but grab - // one here just to make sure, in case one of our calling code paths ever - // changes. - DestructorGuard dg(this); - - // If we have a readCallback_, make sure we enable read events. We - // may already be registered for reads if connectSuccess() set - // the read calback. - if (readCallback_ && !(eventFlags_ & EventHandler::READ)) { - assert(state_ == StateEnum::ESTABLISHED); - assert((shutdownFlags_ & SHUT_READ) == 0); - if (!updateEventRegistration(EventHandler::READ, 0)) { - assert(state_ == StateEnum::ERROR); - return; - } - checkForImmediateRead(); - } else if (readCallback_ == nullptr) { - // Unregister for read events. - updateEventRegistration(0, EventHandler::READ); - } - - // If we have write requests pending, try to send them immediately. - // Since we just finished accepting, there is a very good chance that we can - // write without blocking. - // - // However, we only process them if EventHandler::WRITE is not already set, - // which means that we're already blocked on a write attempt. (This can - // happen if connectSuccess() called write() before returning.) - if (writeReqHead_ && !(eventFlags_ & EventHandler::WRITE)) { - // Call handleWrite() to perform write processing. - handleWrite(); - } else if (writeReqHead_ == nullptr) { - // Unregister for write event. - updateEventRegistration(0, EventHandler::WRITE); - } -} - -void AsyncSocket::handleConnect() noexcept { - VLOG(5) << "AsyncSocket::handleConnect() this=" << this << ", fd=" << fd_ - << ", state=" << state_; - assert(state_ == StateEnum::CONNECTING); - // SHUT_WRITE can never be set while we are still connecting; - // SHUT_WRITE_PENDING may be set, be we only set SHUT_WRITE once the connect - // finishes - assert((shutdownFlags_ & SHUT_WRITE) == 0); - - // In case we had a connect timeout, cancel the timeout - writeTimeout_.cancelTimeout(); - // We don't use a persistent registration when waiting on a connect event, - // so we have been automatically unregistered now. Update eventFlags_ to - // reflect reality. - assert(eventFlags_ == EventHandler::WRITE); - eventFlags_ = EventHandler::NONE; - - // Call getsockopt() to check if the connect succeeded - int error; - socklen_t len = sizeof(error); - int rv = getsockopt(fd_, SOL_SOCKET, SO_ERROR, &error, &len); - if (rv != 0) { - AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR, - withAddr("error calling getsockopt() after connect"), - errno); - VLOG(4) << "AsyncSocket::handleConnect(this=" << this << ", fd=" - << fd_ << " host=" << addr_.describe() - << ") exception:" << ex.what(); - return failConnect(__func__, ex); - } - - if (error != 0) { - AsyncSocketException ex(AsyncSocketException::NOT_OPEN, - "connect failed", error); - VLOG(1) << "AsyncSocket::handleConnect(this=" << this << ", fd=" - << fd_ << " host=" << addr_.describe() - << ") exception: " << ex.what(); - return failConnect(__func__, ex); - } - - // Move into STATE_ESTABLISHED - state_ = StateEnum::ESTABLISHED; - - // If SHUT_WRITE_PENDING is set and we don't have any write requests to - // perform, immediately shutdown the write half of the socket. - if ((shutdownFlags_ & SHUT_WRITE_PENDING) && writeReqHead_ == nullptr) { - // SHUT_READ shouldn't be set. If close() is called on the socket while we - // are still connecting we just abort the connect rather than waiting for - // it to complete. - assert((shutdownFlags_ & SHUT_READ) == 0); - ::shutdown(fd_, SHUT_WR); - shutdownFlags_ |= SHUT_WRITE; - } - - VLOG(7) << "AsyncSocket " << this << ": fd " << fd_ - << "successfully connected; state=" << state_; - - // Remember the EventBase we are attached to, before we start invoking any - // callbacks (since the callbacks may call detachEventBase()). - EventBase* originalEventBase = eventBase_; - - // Call the connect callback. - if (connectCallback_) { - ConnectCallback* callback = connectCallback_; - connectCallback_ = nullptr; - callback->connectSuccess(); - } - - // Note that the connect callback may have changed our state. - // (set or unset the read callback, called write(), closed the socket, etc.) - // The following code needs to handle these situations correctly. - // - // If the socket has been closed, readCallback_ and writeReqHead_ will - // always be nullptr, so that will prevent us from trying to read or write. - // - // The main thing to check for is if eventBase_ is still originalEventBase. - // If not, we have been detached from this event base, so we shouldn't - // perform any more operations. - if (eventBase_ != originalEventBase) { - return; - } - - handleInitialReadWrite(); -} - -void AsyncSocket::timeoutExpired() noexcept { - VLOG(7) << "AsyncSocket " << this << ", fd " << fd_ << ": timeout expired: " - << "state=" << state_ << ", events=" << std::hex << eventFlags_; - DestructorGuard dg(this); - assert(eventBase_->isInEventBaseThread()); - - if (state_ == StateEnum::CONNECTING) { - // connect() timed out - // Unregister for I/O events. - AsyncSocketException ex(AsyncSocketException::TIMED_OUT, - "connect timed out"); - failConnect(__func__, ex); - } else { - // a normal write operation timed out - assert(state_ == StateEnum::ESTABLISHED); - AsyncSocketException ex(AsyncSocketException::TIMED_OUT, "write timed out"); - failWrite(__func__, ex); - } -} - -ssize_t AsyncSocket::performWrite(const iovec* vec, - uint32_t count, - WriteFlags flags, - uint32_t* countWritten, - uint32_t* partialWritten) { - // We use sendmsg() instead of writev() so that we can pass in MSG_NOSIGNAL - // We correctly handle EPIPE errors, so we never want to receive SIGPIPE - // (since it may terminate the program if the main program doesn't explicitly - // ignore it). - struct msghdr msg; - msg.msg_name = nullptr; - msg.msg_namelen = 0; - msg.msg_iov = const_cast(vec); -#ifdef IOV_MAX // not defined on Android - msg.msg_iovlen = std::min(count, (uint32_t)IOV_MAX); -#else - msg.msg_iovlen = std::min(count, (uint32_t)UIO_MAXIOV); -#endif - msg.msg_control = nullptr; - msg.msg_controllen = 0; - msg.msg_flags = 0; - - int msg_flags = MSG_DONTWAIT; - -#ifdef MSG_NOSIGNAL // Linux-only - msg_flags |= MSG_NOSIGNAL; - if (isSet(flags, WriteFlags::CORK)) { - // MSG_MORE tells the kernel we have more data to send, so wait for us to - // give it the rest of the data rather than immediately sending a partial - // frame, even when TCP_NODELAY is enabled. - msg_flags |= MSG_MORE; - } -#endif - if (isSet(flags, WriteFlags::EOR)) { - // marks that this is the last byte of a record (response) - msg_flags |= MSG_EOR; - } - ssize_t totalWritten = ::sendmsg(fd_, &msg, msg_flags); - if (totalWritten < 0) { - if (errno == EAGAIN) { - // TCP buffer is full; we can't write any more data right now. - *countWritten = 0; - *partialWritten = 0; - return 0; - } - // error - *countWritten = 0; - *partialWritten = 0; - return -1; - } - - appBytesWritten_ += totalWritten; - - uint32_t bytesWritten; - uint32_t n; - for (bytesWritten = totalWritten, n = 0; n < count; ++n) { - const iovec* v = vec + n; - if (v->iov_len > bytesWritten) { - // Partial write finished in the middle of this iovec - *countWritten = n; - *partialWritten = bytesWritten; - return totalWritten; - } - - bytesWritten -= v->iov_len; - } - - assert(bytesWritten == 0); - *countWritten = n; - *partialWritten = 0; - return totalWritten; -} - -/** - * Re-register the EventHandler after eventFlags_ has changed. - * - * If an error occurs, fail() is called to move the socket into the error state - * and call all currently installed callbacks. After an error, the - * AsyncSocket is completely unregistered. - * - * @return Returns true on succcess, or false on error. - */ -bool AsyncSocket::updateEventRegistration() { - VLOG(5) << "AsyncSocket::updateEventRegistration(this=" << this - << ", fd=" << fd_ << ", evb=" << eventBase_ << ", state=" << state_ - << ", events=" << std::hex << eventFlags_; - assert(eventBase_->isInEventBaseThread()); - if (eventFlags_ == EventHandler::NONE) { - ioHandler_.unregisterHandler(); - return true; - } - - // Always register for persistent events, so we don't have to re-register - // after being called back. - if (!ioHandler_.registerHandler(eventFlags_ | EventHandler::PERSIST)) { - eventFlags_ = EventHandler::NONE; // we're not registered after error - AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR, - withAddr("failed to update AsyncSocket event registration")); - fail("updateEventRegistration", ex); - return false; - } - - return true; -} - -bool AsyncSocket::updateEventRegistration(uint16_t enable, - uint16_t disable) { - uint16_t oldFlags = eventFlags_; - eventFlags_ |= enable; - eventFlags_ &= ~disable; - if (eventFlags_ == oldFlags) { - return true; - } else { - return updateEventRegistration(); - } -} - -void AsyncSocket::startFail() { - // startFail() should only be called once - assert(state_ != StateEnum::ERROR); - assert(getDestructorGuardCount() > 0); - state_ = StateEnum::ERROR; - // Ensure that SHUT_READ and SHUT_WRITE are set, - // so all future attempts to read or write will be rejected - shutdownFlags_ |= (SHUT_READ | SHUT_WRITE); - - if (eventFlags_ != EventHandler::NONE) { - eventFlags_ = EventHandler::NONE; - ioHandler_.unregisterHandler(); - } - writeTimeout_.cancelTimeout(); - - if (fd_ >= 0) { - ioHandler_.changeHandlerFD(-1); - doClose(); - } -} - -void AsyncSocket::finishFail() { - assert(state_ == StateEnum::ERROR); - assert(getDestructorGuardCount() > 0); - - AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR, - withAddr("socket closing after error")); - if (connectCallback_) { - ConnectCallback* callback = connectCallback_; - connectCallback_ = nullptr; - callback->connectErr(ex); - } - - failAllWrites(ex); - - if (readCallback_) { - ReadCallback* callback = readCallback_; - readCallback_ = nullptr; - callback->readErr(ex); - } -} - -void AsyncSocket::fail(const char* fn, const AsyncSocketException& ex) { - VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state=" - << state_ << " host=" << addr_.describe() - << "): failed in " << fn << "(): " - << ex.what(); - startFail(); - finishFail(); -} - -void AsyncSocket::failConnect(const char* fn, const AsyncSocketException& ex) { - VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state=" - << state_ << " host=" << addr_.describe() - << "): failed while connecting in " << fn << "(): " - << ex.what(); - startFail(); - - if (connectCallback_ != nullptr) { - ConnectCallback* callback = connectCallback_; - connectCallback_ = nullptr; - callback->connectErr(ex); - } - - finishFail(); -} - -void AsyncSocket::failRead(const char* fn, const AsyncSocketException& ex) { - VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state=" - << state_ << " host=" << addr_.describe() - << "): failed while reading in " << fn << "(): " - << ex.what(); - startFail(); - - if (readCallback_ != nullptr) { - ReadCallback* callback = readCallback_; - readCallback_ = nullptr; - callback->readErr(ex); - } - - finishFail(); -} - -void AsyncSocket::failWrite(const char* fn, const AsyncSocketException& ex) { - VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state=" - << state_ << " host=" << addr_.describe() - << "): failed while writing in " << fn << "(): " - << ex.what(); - startFail(); - - // Only invoke the first write callback, since the error occurred while - // writing this request. Let any other pending write callbacks be invoked in - // finishFail(). - if (writeReqHead_ != nullptr) { - WriteRequest* req = writeReqHead_; - writeReqHead_ = req->getNext(); - WriteCallback* callback = req->getCallback(); - uint32_t bytesWritten = req->getBytesWritten(); - req->destroy(); - if (callback) { - callback->writeErr(bytesWritten, ex); - } - } - - finishFail(); -} - -void AsyncSocket::failWrite(const char* fn, WriteCallback* callback, - size_t bytesWritten, - const AsyncSocketException& ex) { - // This version of failWrite() is used when the failure occurs before - // we've added the callback to writeReqHead_. - VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state=" - << state_ << " host=" << addr_.describe() - <<"): failed while writing in " << fn << "(): " - << ex.what(); - startFail(); - - if (callback != nullptr) { - callback->writeErr(bytesWritten, ex); - } - - finishFail(); -} - -void AsyncSocket::failAllWrites(const AsyncSocketException& ex) { - // Invoke writeError() on all write callbacks. - // This is used when writes are forcibly shutdown with write requests - // pending, or when an error occurs with writes pending. - while (writeReqHead_ != nullptr) { - WriteRequest* req = writeReqHead_; - writeReqHead_ = req->getNext(); - WriteCallback* callback = req->getCallback(); - if (callback) { - callback->writeErr(req->getBytesWritten(), ex); - } - req->destroy(); - } -} - -void AsyncSocket::invalidState(ConnectCallback* callback) { - VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_ - << "): connect() called in invalid state " << state_; - - /* - * The invalidState() methods don't use the normal failure mechanisms, - * since we don't know what state we are in. We don't want to call - * startFail()/finishFail() recursively if we are already in the middle of - * cleaning up. - */ - - AsyncSocketException ex(AsyncSocketException::ALREADY_OPEN, - "connect() called with socket in invalid state"); - if (state_ == StateEnum::CLOSED || state_ == StateEnum::ERROR) { - if (callback) { - callback->connectErr(ex); - } - } else { - // We can't use failConnect() here since connectCallback_ - // may already be set to another callback. Invoke this ConnectCallback - // here; any other connectCallback_ will be invoked in finishFail() - startFail(); - if (callback) { - callback->connectErr(ex); - } - finishFail(); - } -} - -void AsyncSocket::invalidState(ReadCallback* callback) { - VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_ - << "): setReadCallback(" << callback - << ") called in invalid state " << state_; - - AsyncSocketException ex(AsyncSocketException::NOT_OPEN, - "setReadCallback() called with socket in " - "invalid state"); - if (state_ == StateEnum::CLOSED || state_ == StateEnum::ERROR) { - if (callback) { - callback->readErr(ex); - } - } else { - startFail(); - if (callback) { - callback->readErr(ex); - } - finishFail(); - } -} - -void AsyncSocket::invalidState(WriteCallback* callback) { - VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_ - << "): write() called in invalid state " << state_; - - AsyncSocketException ex(AsyncSocketException::NOT_OPEN, - withAddr("write() called with socket in invalid state")); - if (state_ == StateEnum::CLOSED || state_ == StateEnum::ERROR) { - if (callback) { - callback->writeErr(0, ex); - } - } else { - startFail(); - if (callback) { - callback->writeErr(0, ex); - } - finishFail(); - } -} - -void AsyncSocket::doClose() { - if (fd_ == -1) return; - if (shutdownSocketSet_) { - shutdownSocketSet_->close(fd_); - } else { - ::close(fd_); - } - fd_ = -1; -} - -std::ostream& operator << (std::ostream& os, - const AsyncSocket::StateEnum& state) { - os << static_cast(state); - return os; -} - -std::string AsyncSocket::withAddr(const std::string& s) { - // Don't use addr_ directly because it may not be initialized - // e.g. if constructed from fd - folly::SocketAddress peer, local; - try { - getPeerAddress(&peer); - getLocalAddress(&local); - } catch (const std::exception&) { - // ignore - } catch (...) { - // ignore - } - return s + " (peer=" + peer.describe() + ", local=" + local.describe() + ")"; -} - -} // folly diff --git a/folly/io/async/AsyncSocket.h b/folly/io/async/AsyncSocket.h deleted file mode 100644 index 77bd2b0c..00000000 --- a/folly/io/async/AsyncSocket.h +++ /dev/null @@ -1,766 +0,0 @@ -/* - * Copyright 2014 Facebook, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include -#include - -namespace folly { - -/** - * A class for performing asynchronous I/O on a socket. - * - * AsyncSocket allows users to asynchronously wait for data on a socket, and - * to asynchronously send data. - * - * The APIs for reading and writing are intentionally asymmetric. Waiting for - * data to read is a persistent API: a callback is installed, and is notified - * whenever new data is available. It continues to be notified of new events - * until it is uninstalled. - * - * AsyncSocket does not provide read timeout functionality, because it - * typically cannot determine when the timeout should be active. Generally, a - * timeout should only be enabled when processing is blocked waiting on data - * from the remote endpoint. For server sockets, the timeout should not be - * active if the server is currently processing one or more outstanding - * requests for this socket. For client sockets, the timeout should not be - * active if there are no requests pending on the socket. Additionally, if a - * client has multiple pending requests, it will ususally want a separate - * timeout for each request, rather than a single read timeout. - * - * The write API is fairly intuitive: a user can request to send a block of - * data, and a callback will be informed once the entire block has been - * transferred to the kernel, or on error. AsyncSocket does provide a send - * timeout, since most callers want to give up if the remote end stops - * responding and no further progress can be made sending the data. - */ - -class AsyncSocket : virtual public AsyncTransport { - public: - typedef std::unique_ptr UniquePtr; - - class ConnectCallback { - public: - virtual ~ConnectCallback() {} - - /** - * connectSuccess() will be invoked when the connection has been - * successfully established. - */ - virtual void connectSuccess() noexcept = 0; - - /** - * connectErr() will be invoked if the connection attempt fails. - * - * @param ex An exception describing the error that occurred. - */ - virtual void connectErr(const AsyncSocketException& ex) - noexcept = 0; - }; - - class ReadCallback { - public: - virtual ~ReadCallback() {} - - /** - * When data becomes available, getReadBuffer() will be invoked to get the - * buffer into which data should be read. - * - * This method allows the ReadCallback to delay buffer allocation until - * data becomes available. This allows applications to manage large - * numbers of idle connections, without having to maintain a separate read - * buffer for each idle connection. - * - * It is possible that in some cases, getReadBuffer() may be called - * multiple times before readDataAvailable() is invoked. In this case, the - * data will be written to the buffer returned from the most recent call to - * readDataAvailable(). If the previous calls to readDataAvailable() - * returned different buffers, the ReadCallback is responsible for ensuring - * that they are not leaked. - * - * If getReadBuffer() throws an exception, returns a nullptr buffer, or - * returns a 0 length, the ReadCallback will be uninstalled and its - * readError() method will be invoked. - * - * getReadBuffer() is not allowed to change the transport state before it - * returns. (For example, it should never uninstall the read callback, or - * set a different read callback.) - * - * @param bufReturn getReadBuffer() should update *bufReturn to contain the - * address of the read buffer. This parameter will never - * be nullptr. - * @param lenReturn getReadBuffer() should update *lenReturn to contain the - * maximum number of bytes that may be written to the read - * buffer. This parameter will never be nullptr. - */ - virtual void getReadBuffer(void** bufReturn, size_t* lenReturn) = 0; - - /** - * readDataAvailable() will be invoked when data has been successfully read - * into the buffer returned by the last call to getReadBuffer(). - * - * The read callback remains installed after readDataAvailable() returns. - * It must be explicitly uninstalled to stop receiving read events. - * getReadBuffer() will be called at least once before each call to - * readDataAvailable(). getReadBuffer() will also be called before any - * call to readEOF(). - * - * @param len The number of bytes placed in the buffer. - */ - virtual void readDataAvailable(size_t len) noexcept = 0; - - /** - * readEOF() will be invoked when the transport is closed. - * - * The read callback will be automatically uninstalled immediately before - * readEOF() is invoked. - */ - virtual void readEOF() noexcept = 0; - - /** - * readError() will be invoked if an error occurs reading from the - * transport. - * - * The read callback will be automatically uninstalled immediately before - * readError() is invoked. - * - * @param ex An exception describing the error that occurred. - */ - virtual void readErr(const AsyncSocketException& ex) - noexcept = 0; - }; - - class WriteCallback { - public: - virtual ~WriteCallback() {} - - /** - * writeSuccess() will be invoked when all of the data has been - * successfully written. - * - * Note that this mainly signals that the buffer containing the data to - * write is no longer needed and may be freed or re-used. It does not - * guarantee that the data has been fully transmitted to the remote - * endpoint. For example, on socket-based transports, writeSuccess() only - * indicates that the data has been given to the kernel for eventual - * transmission. - */ - virtual void writeSuccess() noexcept = 0; - - /** - * writeError() will be invoked if an error occurs writing the data. - * - * @param bytesWritten The number of bytes that were successfull - * @param ex An exception describing the error that occurred. - */ - virtual void writeErr(size_t bytesWritten, - const AsyncSocketException& ex) - noexcept = 0; - }; - - /** - * Create a new unconnected AsyncSocket. - * - * connect() must later be called on this socket to establish a connection. - */ - explicit AsyncSocket(EventBase* evb); - - void setShutdownSocketSet(ShutdownSocketSet* ss); - - /** - * Create a new AsyncSocket and begin the connection process. - * - * @param evb EventBase that will manage this socket. - * @param address The address to connect to. - * @param connectTimeout Optional timeout in milliseconds for the connection - * attempt. - */ - AsyncSocket(EventBase* evb, - const folly::SocketAddress& address, - uint32_t connectTimeout = 0); - - /** - * Create a new AsyncSocket and begin the connection process. - * - * @param evb EventBase that will manage this socket. - * @param ip IP address to connect to (dotted-quad). - * @param port Destination port in host byte order. - * @param connectTimeout Optional timeout in milliseconds for the connection - * attempt. - */ - AsyncSocket(EventBase* evb, - const std::string& ip, - uint16_t port, - uint32_t connectTimeout = 0); - - /** - * Create a AsyncSocket from an already connected socket file descriptor. - * - * Note that while AsyncSocket enables TCP_NODELAY for sockets it creates - * when connecting, it does not change the socket options when given an - * existing file descriptor. If callers want TCP_NODELAY enabled when using - * this version of the constructor, they need to explicitly call - * setNoDelay(true) after the constructor returns. - * - * @param evb EventBase that will manage this socket. - * @param fd File descriptor to take over (should be a connected socket). - */ - AsyncSocket(EventBase* evb, int fd); - - /** - * Helper function to create a shared_ptr. - * - * This passes in the correct destructor object, since AsyncSocket's - * destructor is protected and cannot be invoked directly. - */ - static std::shared_ptr newSocket(EventBase* evb) { - return std::shared_ptr(new AsyncSocket(evb), - Destructor()); - } - - /** - * Helper function to create a shared_ptr. - */ - static std::shared_ptr newSocket( - EventBase* evb, - const folly::SocketAddress& address, - uint32_t connectTimeout = 0) { - return std::shared_ptr( - new AsyncSocket(evb, address, connectTimeout), - Destructor()); - } - - /** - * Helper function to create a shared_ptr. - */ - static std::shared_ptr newSocket( - EventBase* evb, - const std::string& ip, - uint16_t port, - uint32_t connectTimeout = 0) { - return std::shared_ptr( - new AsyncSocket(evb, ip, port, connectTimeout), - Destructor()); - } - - /** - * Helper function to create a shared_ptr. - */ - static std::shared_ptr newSocket(EventBase* evb, int fd) { - return std::shared_ptr(new AsyncSocket(evb, fd), - Destructor()); - } - - /** - * Destroy the socket. - * - * AsyncSocket::destroy() must be called to destroy the socket. - * The normal destructor is private, and should not be invoked directly. - * This prevents callers from deleting a AsyncSocket while it is invoking a - * callback. - */ - virtual void destroy(); - - /** - * Get the EventBase used by this socket. - */ - EventBase* getEventBase() const override { - return eventBase_; - } - - /** - * Get the file descriptor used by the AsyncSocket. - */ - virtual int getFd() const { - return fd_; - } - - /** - * Extract the file descriptor from the AsyncSocket. - * - * This will immediately cause any installed callbacks to be invoked with an - * error. The AsyncSocket may no longer be used after the file descriptor - * has been extracted. - * - * Returns the file descriptor. The caller assumes ownership of the - * descriptor, and it will not be closed when the AsyncSocket is destroyed. - */ - virtual int detachFd(); - - /** - * Uniquely identifies a handle to a socket option value. Each - * combination of level and option name corresponds to one socket - * option value. - */ - class OptionKey { - public: - bool operator<(const OptionKey& other) const { - if (level == other.level) { - return optname < other.optname; - } - return level < other.level; - } - int apply(int fd, int val) const { - return setsockopt(fd, level, optname, &val, sizeof(val)); - } - int level; - int optname; - }; - - // Maps from a socket option key to its value - typedef std::map OptionMap; - - static const OptionMap emptyOptionMap; - static const folly::SocketAddress anyAddress; - - /** - * Initiate a connection. - * - * @param callback The callback to inform when the connection attempt - * completes. - * @param address The address to connect to. - * @param timeout A timeout value, in milliseconds. If the connection - * does not succeed within this period, - * callback->connectError() will be invoked. - */ - virtual void connect(ConnectCallback* callback, - const folly::SocketAddress& address, - int timeout = 0, - const OptionMap &options = emptyOptionMap, - const folly::SocketAddress& bindAddr = anyAddress - ) noexcept; - void connect(ConnectCallback* callback, const std::string& ip, uint16_t port, - int timeout = 00, - const OptionMap &options = emptyOptionMap) noexcept; - - /** - * Set the send timeout. - * - * If write requests do not make any progress for more than the specified - * number of milliseconds, fail all pending writes and close the socket. - * - * If write requests are currently pending when setSendTimeout() is called, - * the timeout interval is immediately restarted using the new value. - * - * (See the comments for AsyncSocket for an explanation of why AsyncSocket - * provides setSendTimeout() but not setRecvTimeout().) - * - * @param milliseconds The timeout duration, in milliseconds. If 0, no - * timeout will be used. - */ - void setSendTimeout(uint32_t milliseconds) override; - - /** - * Get the send timeout. - * - * @return Returns the current send timeout, in milliseconds. A return value - * of 0 indicates that no timeout is set. - */ - uint32_t getSendTimeout() const override { - return sendTimeout_; - } - - /** - * Set the maximum number of reads to execute from the underlying - * socket each time the EventBase detects that new ingress data is - * available. The default is unlimited, but callers can use this method - * to limit the amount of data read from the socket per event loop - * iteration. - * - * @param maxReads Maximum number of reads per data-available event; - * a value of zero means unlimited. - */ - void setMaxReadsPerEvent(uint16_t maxReads) { - maxReadsPerEvent_ = maxReads; - } - - /** - * Get the maximum number of reads this object will execute from - * the underlying socket each time the EventBase detects that new - * ingress data is available. - * - * @returns Maximum number of reads per data-available event; a value - * of zero means unlimited. - */ - uint16_t getMaxReadsPerEvent() const { - return maxReadsPerEvent_; - } - - // Read and write methods - void setReadCB(ReadCallback* callback); - ReadCallback* getReadCallback() const; - - void write(WriteCallback* callback, const void* buf, size_t bytes, - WriteFlags flags = WriteFlags::NONE); - void writev(WriteCallback* callback, const iovec* vec, size_t count, - WriteFlags flags = WriteFlags::NONE); - void writeChain(WriteCallback* callback, - std::unique_ptr&& buf, - WriteFlags flags = WriteFlags::NONE); - - // Methods inherited from AsyncTransport - void close() override; - void closeNow() override; - void closeWithReset() override; - void shutdownWrite() override; - void shutdownWriteNow() override; - - bool readable() const override; - bool isPending() const override; - virtual bool hangup() const; - bool good() const override; - bool error() const override; - void attachEventBase(EventBase* eventBase) override; - void detachEventBase() override; - bool isDetachable() const override; - - void getLocalAddress( - folly::SocketAddress* address) const override; - void getPeerAddress( - folly::SocketAddress* address) const override; - - bool isEorTrackingEnabled() const override { return false; } - - void setEorTracking(bool track) override {} - - bool connecting() const override { - return (state_ == StateEnum::CONNECTING); - } - - size_t getAppBytesWritten() const override { - return appBytesWritten_; - } - - size_t getRawBytesWritten() const override { - return getAppBytesWritten(); - } - - size_t getAppBytesReceived() const override { - return appBytesReceived_; - } - - size_t getRawBytesReceived() const override { - return getAppBytesReceived(); - } - - // Methods controlling socket options - - /** - * Force writes to be transmitted immediately. - * - * This controls the TCP_NODELAY socket option. When enabled, TCP segments - * are sent as soon as possible, even if it is not a full frame of data. - * When disabled, the data may be buffered briefly to try and wait for a full - * frame of data. - * - * By default, TCP_NODELAY is enabled for AsyncSocket objects. - * - * This method will fail if the socket is not currently open. - * - * @return Returns 0 if the TCP_NODELAY flag was successfully updated, - * or a non-zero errno value on error. - */ - int setNoDelay(bool noDelay); - - /* - * Set the Flavor of Congestion Control to be used for this Socket - * Please check '/lib/modules//kernel/net/ipv4' for tcp_*.ko - * first to make sure the module is available for plugging in - * Alternatively you can choose from net.ipv4.tcp_allowed_congestion_control - */ - int setCongestionFlavor(const std::string &cname); - - /* - * Forces ACKs to be sent immediately - * - * @return Returns 0 if the TCP_QUICKACK flag was successfully updated, - * or a non-zero errno value on error. - */ - int setQuickAck(bool quickack); - - /** - * Set the send bufsize - */ - int setSendBufSize(size_t bufsize); - - /** - * Set the recv bufsize - */ - int setRecvBufSize(size_t bufsize); - - /** - * Sets a specific tcp personality - * Available only on kernels 3.2 and greater - */ - #define SO_SET_NAMESPACE 41 - int setTCPProfile(int profd); - - - /** - * Generic API for reading a socket option. - * - * @param level same as the "level" parameter in getsockopt(). - * @param optname same as the "optname" parameter in getsockopt(). - * @param optval pointer to the variable in which the option value should - * be returned. - * @return same as the return value of getsockopt(). - */ - template - int getSockOpt(int level, int optname, T *optval) { - return getsockopt(fd_, level, optname, optval, sizeof(T)); - } - - /** - * Generic API for setting a socket option. - * - * @param level same as the "level" parameter in getsockopt(). - * @param optname same as the "optname" parameter in getsockopt(). - * @param optval the option value to set. - * @return same as the return value of setsockopt(). - */ - template - int setSockOpt(int level, int optname, const T *optval) { - return setsockopt(fd_, level, optname, optval, sizeof(T)); - } - - protected: - enum ReadResultEnum { - READ_EOF = 0, - READ_ERROR = -1, - READ_BLOCKING = -2, - }; - - /** - * Protected destructor. - * - * Users of AsyncSocket must never delete it directly. Instead, invoke - * destroy() instead. (See the documentation in DelayedDestruction.h for - * more details.) - */ - ~AsyncSocket(); - - enum class StateEnum : uint8_t { - UNINIT, - CONNECTING, - ESTABLISHED, - CLOSED, - ERROR - }; - - friend std::ostream& operator << (std::ostream& os, const StateEnum& state); - - enum ShutdownFlags { - /// shutdownWrite() called, but we are still waiting on writes to drain - SHUT_WRITE_PENDING = 0x01, - /// writes have been completely shut down - SHUT_WRITE = 0x02, - /** - * Reads have been shutdown. - * - * At the moment we don't distinguish between remote read shutdown - * (received EOF from the remote end) and local read shutdown. We can - * only receive EOF when a read callback is set, and we immediately inform - * it of the EOF. Therefore there doesn't seem to be any reason to have a - * separate state of "received EOF but the local side may still want to - * read". - * - * We also don't currently provide any API for only shutting down the read - * side of a socket. (This is a no-op as far as TCP is concerned, anyway.) - */ - SHUT_READ = 0x04, - }; - - class WriteRequest; - - class WriteTimeout : public AsyncTimeout { - public: - WriteTimeout(AsyncSocket* socket, EventBase* eventBase) - : AsyncTimeout(eventBase) - , socket_(socket) {} - - virtual void timeoutExpired() noexcept { - socket_->timeoutExpired(); - } - - private: - AsyncSocket* socket_; - }; - - class IoHandler : public EventHandler { - public: - IoHandler(AsyncSocket* socket, EventBase* eventBase) - : EventHandler(eventBase, -1) - , socket_(socket) {} - IoHandler(AsyncSocket* socket, EventBase* eventBase, int fd) - : EventHandler(eventBase, fd) - , socket_(socket) {} - - virtual void handlerReady(uint16_t events) noexcept { - socket_->ioReady(events); - } - - private: - AsyncSocket* socket_; - }; - - void init(); - - // event notification methods - void ioReady(uint16_t events) noexcept; - virtual void checkForImmediateRead() noexcept; - virtual void handleInitialReadWrite() noexcept; - virtual void handleRead() noexcept; - virtual void handleWrite() noexcept; - virtual void handleConnect() noexcept; - void timeoutExpired() noexcept; - - /** - * Attempt to read from the socket. - * - * @param buf The buffer to read data into. - * @param buflen The length of the buffer. - * - * @return Returns the number of bytes read, or READ_EOF on EOF, or - * READ_ERROR on error, or READ_BLOCKING if the operation will - * block. - */ - virtual ssize_t performRead(void* buf, size_t buflen); - - /** - * Populate an iovec array from an IOBuf and attempt to write it. - * - * @param callback Write completion/error callback. - * @param vec Target iovec array; caller retains ownership. - * @param count Number of IOBufs to write, beginning at start of buf. - * @param buf Chain of iovecs. - * @param flags set of flags for the underlying write calls, like cork - */ - void writeChainImpl(WriteCallback* callback, iovec* vec, - size_t count, std::unique_ptr&& buf, - WriteFlags flags); - - /** - * Write as much data as possible to the socket without blocking, - * and queue up any leftover data to send when the socket can - * handle writes again. - * - * @param callback The callback to invoke when the write is completed. - * @param vec Array of buffers to write; this method will make a - * copy of the vector (but not the buffers themselves) - * if the write has to be completed asynchronously. - * @param count Number of elements in vec. - * @param buf The IOBuf that manages the buffers referenced by - * vec, or a pointer to nullptr if the buffers are not - * associated with an IOBuf. Note that ownership of - * the IOBuf is transferred here; upon completion of - * the write, the AsyncSocket deletes the IOBuf. - * @param flags Set of write flags. - */ - void writeImpl(WriteCallback* callback, const iovec* vec, size_t count, - std::unique_ptr&& buf, - WriteFlags flags = WriteFlags::NONE); - - /** - * Attempt to write to the socket. - * - * @param vec The iovec array pointing to the buffers to write. - * @param count The length of the iovec array. - * @param flags Set of write flags. - * @param countWritten On return, the value pointed to by this parameter - * will contain the number of iovec entries that were - * fully written. - * @param partialWritten On return, the value pointed to by this parameter - * will contain the number of bytes written in the - * partially written iovec entry. - * - * @return Returns the total number of bytes written, or -1 on error. If no - * data can be written immediately, 0 is returned. - */ - virtual ssize_t performWrite(const iovec* vec, uint32_t count, - WriteFlags flags, uint32_t* countWritten, - uint32_t* partialWritten); - - bool updateEventRegistration(); - - /** - * Update event registration. - * - * @param enable Flags of events to enable. Set it to 0 if no events - * need to be enabled in this call. - * @param disable Flags of events - * to disable. Set it to 0 if no events need to be disabled in this - * call. - * - * @return true iff the update is successful. - */ - bool updateEventRegistration(uint16_t enable, uint16_t disable); - - // Actually close the file descriptor and set it to -1 so we don't - // accidentally close it again. - void doClose(); - - // error handling methods - void startFail(); - void finishFail(); - void fail(const char* fn, const AsyncSocketException& ex); - void failConnect(const char* fn, const AsyncSocketException& ex); - void failRead(const char* fn, const AsyncSocketException& ex); - void failWrite(const char* fn, WriteCallback* callback, size_t bytesWritten, - const AsyncSocketException& ex); - void failWrite(const char* fn, const AsyncSocketException& ex); - void failAllWrites(const AsyncSocketException& ex); - void invalidState(ConnectCallback* callback); - void invalidState(ReadCallback* callback); - void invalidState(WriteCallback* callback); - - std::string withAddr(const std::string& s); - - StateEnum state_; ///< StateEnum describing current state - uint8_t shutdownFlags_; ///< Shutdown state (ShutdownFlags) - uint16_t eventFlags_; ///< EventBase::HandlerFlags settings - int fd_; ///< The socket file descriptor - mutable - folly::SocketAddress addr_; ///< The address we tried to connect to - uint32_t sendTimeout_; ///< The send timeout, in milliseconds - uint16_t maxReadsPerEvent_; ///< Max reads per event loop iteration - EventBase* eventBase_; ///< The EventBase - WriteTimeout writeTimeout_; ///< A timeout for connect and write - IoHandler ioHandler_; ///< A EventHandler to monitor the fd - - ConnectCallback* connectCallback_; ///< ConnectCallback - ReadCallback* readCallback_; ///< ReadCallback - WriteRequest* writeReqHead_; ///< Chain of WriteRequests - WriteRequest* writeReqTail_; ///< End of WriteRequest chain - ShutdownSocketSet* shutdownSocketSet_; - size_t appBytesReceived_; ///< Num of bytes received from socket - size_t appBytesWritten_; ///< Num of bytes written to socket -}; - - -} // folly diff --git a/folly/io/async/AsyncSocketException.h b/folly/io/async/AsyncSocketException.h deleted file mode 100644 index 762e9bc0..00000000 --- a/folly/io/async/AsyncSocketException.h +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Copyright 2014 Facebook, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include - -namespace folly { - -class AsyncSocketException : public std::runtime_error { - public: - enum AsyncSocketExceptionType - { UNKNOWN = 0 - , NOT_OPEN = 1 - , ALREADY_OPEN = 2 - , TIMED_OUT = 3 - , END_OF_FILE = 4 - , INTERRUPTED = 5 - , BAD_ARGS = 6 - , CORRUPTED_DATA = 7 - , INTERNAL_ERROR = 8 - , NOT_SUPPORTED = 9 - , INVALID_STATE = 10 - , SSL_ERROR = 12 - , COULD_NOT_BIND = 13 - , SASL_HANDSHAKE_TIMEOUT = 14 - }; - - AsyncSocketException( - AsyncSocketExceptionType type, const std::string& message) : - std::runtime_error(message), - type_(type), errno_(0) {} - - /** Error code */ - AsyncSocketExceptionType type_; - - /** A copy of the errno. */ - int errno_; - - AsyncSocketException(AsyncSocketExceptionType type, - const std::string& message, - int errno_copy) : - std::runtime_error(getMessage(message, errno_copy)), - type_(type), errno_(errno_copy) {} - - AsyncSocketExceptionType getType() const noexcept { return type_; } - int getErrno() const noexcept { return errno_; } - - protected: - /** Just like strerror_r but returns a C++ string object. */ - std::string strerror_s(int errno_copy) { - return "errno = " + folly::to(errno_copy); - } - - /** Return a message based on the input. */ - std::string getMessage(const std::string &message, - int errno_copy) { - if (errno_copy != 0) { - return message + ": " + strerror_s(errno_copy); - } else { - return message; - } - } -}; - -} // folly diff --git a/folly/io/async/AsyncTransport.h b/folly/io/async/AsyncTransport.h deleted file mode 100644 index 0b929c16..00000000 --- a/folly/io/async/AsyncTransport.h +++ /dev/null @@ -1,317 +0,0 @@ -/* - * Copyright 2014 Facebook, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -namespace folly { - -/* - * flags given by the application for write* calls - */ -enum class WriteFlags : uint32_t { - NONE = 0x00, - /* - * Whether to delay the output until a subsequent non-corked write. - * (Note: may not be supported in all subclasses or on all platforms.) - */ - CORK = 0x01, - /* - * for a socket that has ACK latency enabled, it will cause the kernel - * to fire a TCP ESTATS event when the last byte of the given write call - * will be acknowledged. - */ - EOR = 0x02, -}; - -/* - * union operator - */ -inline WriteFlags operator|(WriteFlags a, WriteFlags b) { - return static_cast( - static_cast(a) | static_cast(b)); -} - -/* - * intersection operator - */ -inline WriteFlags operator&(WriteFlags a, WriteFlags b) { - return static_cast( - static_cast(a) & static_cast(b)); -} - -/* - * exclusion parameter - */ -inline WriteFlags operator~(WriteFlags a) { - return static_cast(~static_cast(a)); -} - -/* - * unset operator - */ -inline WriteFlags unSet(WriteFlags a, WriteFlags b) { - return a & ~b; -} - -/* - * inclusion operator - */ -inline bool isSet(WriteFlags a, WriteFlags b) { - return (a & b) == b; -} - - -/** - * AsyncTransport defines an asynchronous API for streaming I/O. - * - * This class provides an API to for asynchronously waiting for data - * on a streaming transport, and for asynchronously sending data. - * - * The APIs for reading and writing are intentionally asymmetric. Waiting for - * data to read is a persistent API: a callback is installed, and is notified - * whenever new data is available. It continues to be notified of new events - * until it is uninstalled. - * - * AsyncTransport does not provide read timeout functionality, because it - * typically cannot determine when the timeout should be active. Generally, a - * timeout should only be enabled when processing is blocked waiting on data - * from the remote endpoint. For server-side applications, the timeout should - * not be active if the server is currently processing one or more outstanding - * requests on this transport. For client-side applications, the timeout - * should not be active if there are no requests pending on the transport. - * Additionally, if a client has multiple pending requests, it will ususally - * want a separate timeout for each request, rather than a single read timeout. - * - * The write API is fairly intuitive: a user can request to send a block of - * data, and a callback will be informed once the entire block has been - * transferred to the kernel, or on error. AsyncTransport does provide a send - * timeout, since most callers want to give up if the remote end stops - * responding and no further progress can be made sending the data. - */ -class AsyncTransport : public DelayedDestruction { - public: - typedef std::unique_ptr UniquePtr; - - /** - * Close the transport. - * - * This gracefully closes the transport, waiting for all pending write - * requests to complete before actually closing the underlying transport. - * - * If a read callback is set, readEOF() will be called immediately. If there - * are outstanding write requests, the close will be delayed until all - * remaining writes have completed. No new writes may be started after - * close() has been called. - */ - virtual void close() = 0; - - /** - * Close the transport immediately. - * - * This closes the transport immediately, dropping any outstanding data - * waiting to be written. - * - * If a read callback is set, readEOF() will be called immediately. - * If there are outstanding write requests, these requests will be aborted - * and writeError() will be invoked immediately on all outstanding write - * callbacks. - */ - virtual void closeNow() = 0; - - /** - * Reset the transport immediately. - * - * This closes the transport immediately, sending a reset to the remote peer - * if possible to indicate abnormal shutdown. - * - * Note that not all subclasses implement this reset functionality: some - * subclasses may treat reset() the same as closeNow(). Subclasses that use - * TCP transports should terminate the connection with a TCP reset. - */ - virtual void closeWithReset() { - closeNow(); - } - - /** - * Perform a half-shutdown of the write side of the transport. - * - * The caller should not make any more calls to write() or writev() after - * shutdownWrite() is called. Any future write attempts will fail - * immediately. - * - * Not all transport types support half-shutdown. If the underlying - * transport does not support half-shutdown, it will fully shutdown both the - * read and write sides of the transport. (Fully shutting down the socket is - * better than doing nothing at all, since the caller may rely on the - * shutdownWrite() call to notify the other end of the connection that no - * more data can be read.) - * - * If there is pending data still waiting to be written on the transport, - * the actual shutdown will be delayed until the pending data has been - * written. - * - * Note: There is no corresponding shutdownRead() equivalent. Simply - * uninstall the read callback if you wish to stop reading. (On TCP sockets - * at least, shutting down the read side of the socket is a no-op anyway.) - */ - virtual void shutdownWrite() = 0; - - /** - * Perform a half-shutdown of the write side of the transport. - * - * shutdownWriteNow() is identical to shutdownWrite(), except that it - * immediately performs the shutdown, rather than waiting for pending writes - * to complete. Any pending write requests will be immediately failed when - * shutdownWriteNow() is called. - */ - virtual void shutdownWriteNow() = 0; - - /** - * Determine if transport is open and ready to read or write. - * - * Note that this function returns false on EOF; you must also call error() - * to distinguish between an EOF and an error. - * - * @return true iff the transport is open and ready, false otherwise. - */ - virtual bool good() const = 0; - - /** - * Determine if the transport is readable or not. - * - * @return true iff the transport is readable, false otherwise. - */ - virtual bool readable() const = 0; - - /** - * Determine if the there is pending data on the transport. - * - * @return true iff the if the there is pending data, false otherwise. - */ - virtual bool isPending() const { - return readable(); - } - /** - * Determine if transport is connected to the endpoint - * - * @return false iff the transport is connected, otherwise true - */ - virtual bool connecting() const = 0; - - /** - * Determine if an error has occurred with this transport. - * - * @return true iff an error has occurred (not EOF). - */ - virtual bool error() const = 0; - - /** - * Attach the transport to a EventBase. - * - * This may only be called if the transport is not currently attached to a - * EventBase (by an earlier call to detachEventBase()). - * - * This method must be invoked in the EventBase's thread. - */ - virtual void attachEventBase(EventBase* eventBase) = 0; - - /** - * Detach the transport from its EventBase. - * - * This may only be called when the transport is idle and has no reads or - * writes pending. Once detached, the transport may not be used again until - * it is re-attached to a EventBase by calling attachEventBase(). - * - * This method must be called from the current EventBase's thread. - */ - virtual void detachEventBase() = 0; - - /** - * Determine if the transport can be detached. - * - * This method must be called from the current EventBase's thread. - */ - virtual bool isDetachable() const = 0; - - /** - * Get the EventBase used by this transport. - * - * Returns nullptr if this transport is not currently attached to a - * EventBase. - */ - virtual EventBase* getEventBase() const = 0; - - /** - * Set the send timeout. - * - * If write requests do not make any progress for more than the specified - * number of milliseconds, fail all pending writes and close the transport. - * - * If write requests are currently pending when setSendTimeout() is called, - * the timeout interval is immediately restarted using the new value. - * - * @param milliseconds The timeout duration, in milliseconds. If 0, no - * timeout will be used. - */ - virtual void setSendTimeout(uint32_t milliseconds) = 0; - - /** - * Get the send timeout. - * - * @return Returns the current send timeout, in milliseconds. A return value - * of 0 indicates that no timeout is set. - */ - virtual uint32_t getSendTimeout() const = 0; - - /** - * Get the address of the local endpoint of this transport. - * - * This function may throw AsyncSocketException on error. - * - * @param address The local address will be stored in the specified - * SocketAddress. - */ - virtual void getLocalAddress(folly::SocketAddress* address) const = 0; - - /** - * Get the address of the remote endpoint to which this transport is - * connected. - * - * This function may throw AsyncSocketException on error. - * - * @param address The remote endpoint's address will be stored in the - * specified SocketAddress. - */ - virtual void getPeerAddress(folly::SocketAddress* address) const = 0; - - /** - * @return True iff end of record tracking is enabled - */ - virtual bool isEorTrackingEnabled() const = 0; - - virtual void setEorTracking(bool track) = 0; - - virtual size_t getAppBytesWritten() const = 0; - virtual size_t getRawBytesWritten() const = 0; - virtual size_t getAppBytesReceived() const = 0; - virtual size_t getRawBytesReceived() const = 0; - - protected: - virtual ~AsyncTransport() {} -}; - - -} // folly -- 2.34.1