+++ /dev/null
-/*
- * Copyright 2014 Facebook, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <folly/io/async/AsyncSocket.h>
-
-#include <folly/io/async/EventBase.h>
-#include <folly/SocketAddress.h>
-#include <folly/io/IOBuf.h>
-
-#include <poll.h>
-#include <errno.h>
-#include <limits.h>
-#include <unistd.h>
-#include <fcntl.h>
-#include <sys/types.h>
-#include <sys/socket.h>
-#include <netinet/in.h>
-#include <netinet/tcp.h>
-
-using std::string;
-using std::unique_ptr;
-
-namespace folly {
-
-// static members initializers
-const AsyncSocket::OptionMap AsyncSocket::emptyOptionMap;
-const folly::SocketAddress AsyncSocket::anyAddress =
- folly::SocketAddress("0.0.0.0", 0);
-
-const AsyncSocketException socketClosedLocallyEx(
- AsyncSocketException::END_OF_FILE, "socket closed locally");
-const AsyncSocketException socketShutdownForWritesEx(
- AsyncSocketException::END_OF_FILE, "socket shutdown for writes");
-
-// TODO: It might help performance to provide a version of WriteRequest that
-// users could derive from, so we can avoid the extra allocation for each call
-// to write()/writev(). We could templatize TFramedAsyncChannel just like the
-// protocols are currently templatized for transports.
-//
-// We would need the version for external users where they provide the iovec
-// storage space, and only our internal version would allocate it at the end of
-// the WriteRequest.
-
-/**
- * A WriteRequest object tracks information about a pending write() or writev()
- * operation.
- *
- * A new WriteRequest operation is allocated on the heap for all write
- * operations that cannot be completed immediately.
- */
-class AsyncSocket::WriteRequest {
- public:
- static WriteRequest* newRequest(WriteCallback* callback,
- const iovec* ops,
- uint32_t opCount,
- unique_ptr<IOBuf>&& ioBuf,
- WriteFlags flags) {
- assert(opCount > 0);
- // Since we put a variable size iovec array at the end
- // of each WriteRequest, we have to manually allocate the memory.
- void* buf = malloc(sizeof(WriteRequest) +
- (opCount * sizeof(struct iovec)));
- if (buf == nullptr) {
- throw std::bad_alloc();
- }
-
- return new(buf) WriteRequest(callback, ops, opCount, std::move(ioBuf),
- flags);
- }
-
- void destroy() {
- this->~WriteRequest();
- free(this);
- }
-
- bool cork() const {
- return isSet(flags_, WriteFlags::CORK);
- }
-
- WriteFlags flags() const {
- return flags_;
- }
-
- WriteRequest* getNext() const {
- return next_;
- }
-
- WriteCallback* getCallback() const {
- return callback_;
- }
-
- uint32_t getBytesWritten() const {
- return bytesWritten_;
- }
-
- const struct iovec* getOps() const {
- assert(opCount_ > opIndex_);
- return writeOps_ + opIndex_;
- }
-
- uint32_t getOpCount() const {
- assert(opCount_ > opIndex_);
- return opCount_ - opIndex_;
- }
-
- void consume(uint32_t wholeOps, uint32_t partialBytes,
- uint32_t totalBytesWritten) {
- // Advance opIndex_ forward by wholeOps
- opIndex_ += wholeOps;
- assert(opIndex_ < opCount_);
-
- // If we've finished writing any IOBufs, release them
- if (ioBuf_) {
- for (uint32_t i = wholeOps; i != 0; --i) {
- assert(ioBuf_);
- ioBuf_ = ioBuf_->pop();
- }
- }
-
- // Move partialBytes forward into the current iovec buffer
- struct iovec* currentOp = writeOps_ + opIndex_;
- assert((partialBytes < currentOp->iov_len) || (currentOp->iov_len == 0));
- currentOp->iov_base =
- reinterpret_cast<uint8_t*>(currentOp->iov_base) + partialBytes;
- currentOp->iov_len -= partialBytes;
-
- // Increment the bytesWritten_ count by totalBytesWritten
- bytesWritten_ += totalBytesWritten;
- }
-
- void append(WriteRequest* next) {
- assert(next_ == nullptr);
- next_ = next;
- }
-
- private:
- WriteRequest(WriteCallback* callback,
- const struct iovec* ops,
- uint32_t opCount,
- unique_ptr<IOBuf>&& ioBuf,
- WriteFlags flags)
- : next_(nullptr)
- , callback_(callback)
- , bytesWritten_(0)
- , opCount_(opCount)
- , opIndex_(0)
- , flags_(flags)
- , ioBuf_(std::move(ioBuf)) {
- memcpy(writeOps_, ops, sizeof(*ops) * opCount_);
- }
-
- // Private destructor, to ensure callers use destroy()
- ~WriteRequest() {}
-
- WriteRequest* next_; ///< pointer to next WriteRequest
- WriteCallback* callback_; ///< completion callback
- uint32_t bytesWritten_; ///< bytes written
- uint32_t opCount_; ///< number of entries in writeOps_
- uint32_t opIndex_; ///< current index into writeOps_
- WriteFlags flags_; ///< set for WriteFlags
- unique_ptr<IOBuf> ioBuf_; ///< underlying IOBuf, or nullptr if N/A
- struct iovec writeOps_[]; ///< write operation(s) list
-};
-
-AsyncSocket::AsyncSocket(EventBase* evb)
- : eventBase_(evb)
- , writeTimeout_(this, evb)
- , ioHandler_(this, evb) {
- VLOG(5) << "new AsyncSocket(" << this << ", evb=" << evb << ")";
- init();
-}
-
-AsyncSocket::AsyncSocket(EventBase* evb,
- const folly::SocketAddress& address,
- uint32_t connectTimeout)
- : eventBase_(evb)
- , writeTimeout_(this, evb)
- , ioHandler_(this, evb) {
- VLOG(5) << "new AsyncSocket(" << this << ", evb=" << evb << ")";
- init();
- connect(nullptr, address, connectTimeout);
-}
-
-AsyncSocket::AsyncSocket(EventBase* evb,
- const std::string& ip,
- uint16_t port,
- uint32_t connectTimeout)
- : eventBase_(evb)
- , writeTimeout_(this, evb)
- , ioHandler_(this, evb) {
- VLOG(5) << "new AsyncSocket(" << this << ", evb=" << evb << ")";
- init();
- connect(nullptr, ip, port, connectTimeout);
-}
-
-AsyncSocket::AsyncSocket(EventBase* evb, int fd)
- : eventBase_(evb)
- , writeTimeout_(this, evb)
- , ioHandler_(this, evb, fd) {
- VLOG(5) << "new AsyncSocket(" << this << ", evb=" << evb << ", fd="
- << fd << ")";
- init();
- fd_ = fd;
- state_ = StateEnum::ESTABLISHED;
-}
-
-// init() method, since constructor forwarding isn't supported in most
-// compilers yet.
-void AsyncSocket::init() {
- assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
- shutdownFlags_ = 0;
- state_ = StateEnum::UNINIT;
- eventFlags_ = EventHandler::NONE;
- fd_ = -1;
- sendTimeout_ = 0;
- maxReadsPerEvent_ = 0;
- connectCallback_ = nullptr;
- readCallback_ = nullptr;
- writeReqHead_ = nullptr;
- writeReqTail_ = nullptr;
- shutdownSocketSet_ = nullptr;
- appBytesWritten_ = 0;
- appBytesReceived_ = 0;
-}
-
-AsyncSocket::~AsyncSocket() {
- VLOG(7) << "actual destruction of AsyncSocket(this=" << this
- << ", evb=" << eventBase_ << ", fd=" << fd_
- << ", state=" << state_ << ")";
-}
-
-void AsyncSocket::destroy() {
- VLOG(5) << "AsyncSocket::destroy(this=" << this << ", evb=" << eventBase_
- << ", fd=" << fd_ << ", state=" << state_;
- // When destroy is called, close the socket immediately
- closeNow();
-
- // Then call DelayedDestruction::destroy() to take care of
- // whether or not we need immediate or delayed destruction
- DelayedDestruction::destroy();
-}
-
-int AsyncSocket::detachFd() {
- VLOG(6) << "AsyncSocket::detachFd(this=" << this << ", fd=" << fd_
- << ", evb=" << eventBase_ << ", state=" << state_
- << ", events=" << std::hex << eventFlags_ << ")";
- // Extract the fd, and set fd_ to -1 first, so closeNow() won't
- // actually close the descriptor.
- if (shutdownSocketSet_) {
- shutdownSocketSet_->remove(fd_);
- }
- int fd = fd_;
- fd_ = -1;
- // Call closeNow() to invoke all pending callbacks with an error.
- closeNow();
- // Update the EventHandler to stop using this fd.
- // This can only be done after closeNow() unregisters the handler.
- ioHandler_.changeHandlerFD(-1);
- return fd;
-}
-
-void AsyncSocket::setShutdownSocketSet(ShutdownSocketSet* newSS) {
- if (shutdownSocketSet_ == newSS) {
- return;
- }
- if (shutdownSocketSet_ && fd_ != -1) {
- shutdownSocketSet_->remove(fd_);
- }
- shutdownSocketSet_ = newSS;
- if (shutdownSocketSet_ && fd_ != -1) {
- shutdownSocketSet_->add(fd_);
- }
-}
-
-void AsyncSocket::connect(ConnectCallback* callback,
- const folly::SocketAddress& address,
- int timeout,
- const OptionMap &options,
- const folly::SocketAddress& bindAddr) noexcept {
- DestructorGuard dg(this);
- assert(eventBase_->isInEventBaseThread());
-
- addr_ = address;
-
- // Make sure we're in the uninitialized state
- if (state_ != StateEnum::UNINIT) {
- return invalidState(callback);
- }
-
- assert(fd_ == -1);
- state_ = StateEnum::CONNECTING;
- connectCallback_ = callback;
-
- sockaddr_storage addrStorage;
- sockaddr* saddr = reinterpret_cast<sockaddr*>(&addrStorage);
-
- try {
- // Create the socket
- // Technically the first parameter should actually be a protocol family
- // constant (PF_xxx) rather than an address family (AF_xxx), but the
- // distinction is mainly just historical. In pretty much all
- // implementations the PF_foo and AF_foo constants are identical.
- fd_ = socket(address.getFamily(), SOCK_STREAM, 0);
- if (fd_ < 0) {
- throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
- withAddr("failed to create socket"), errno);
- }
- if (shutdownSocketSet_) {
- shutdownSocketSet_->add(fd_);
- }
- ioHandler_.changeHandlerFD(fd_);
-
- // Set the FD_CLOEXEC flag so that the socket will be closed if the program
- // later forks and execs.
- int rv = fcntl(fd_, F_SETFD, FD_CLOEXEC);
- if (rv != 0) {
- throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
- withAddr("failed to set close-on-exec flag"),
- errno);
- }
-
- // Put the socket in non-blocking mode
- int flags = fcntl(fd_, F_GETFL, 0);
- if (flags == -1) {
- throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
- withAddr("failed to get socket flags"), errno);
- }
- rv = fcntl(fd_, F_SETFL, flags | O_NONBLOCK);
- if (rv == -1) {
- throw AsyncSocketException(
- AsyncSocketException::INTERNAL_ERROR,
- withAddr("failed to put socket in non-blocking mode"),
- errno);
- }
-
-#if !defined(MSG_NOSIGNAL) && defined(F_SETNOSIGPIPE)
- // iOS and OS X don't support MSG_NOSIGNAL; set F_SETNOSIGPIPE instead
- rv = fcntl(fd_, F_SETNOSIGPIPE, 1);
- if (rv == -1) {
- throw AsyncSocketException(
- AsyncSocketException::INTERNAL_ERROR,
- "failed to enable F_SETNOSIGPIPE on socket",
- errno);
- }
-#endif
-
- // By default, turn on TCP_NODELAY
- // If setNoDelay() fails, we continue anyway; this isn't a fatal error.
- // setNoDelay() will log an error message if it fails.
- if (address.getFamily() != AF_UNIX) {
- (void)setNoDelay(true);
- }
-
- VLOG(5) << "AsyncSocket::connect(this=" << this << ", evb=" << eventBase_
- << ", fd=" << fd_ << ", host=" << address.describe().c_str();
-
- // bind the socket
- if (bindAddr != anyAddress) {
- int one = 1;
- if (::setsockopt(fd_, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one))) {
- doClose();
- throw AsyncSocketException(
- AsyncSocketException::NOT_OPEN,
- "failed to setsockopt prior to bind on " + bindAddr.describe(),
- errno);
- }
-
- bindAddr.getAddress(&addrStorage);
-
- if (::bind(fd_, saddr, bindAddr.getActualSize()) != 0) {
- doClose();
- throw AsyncSocketException(AsyncSocketException::NOT_OPEN,
- "failed to bind to async socket: " +
- bindAddr.describe(),
- errno);
- }
- }
-
- // Apply the additional options if any.
- for (const auto& opt: options) {
- int rv = opt.first.apply(fd_, opt.second);
- if (rv != 0) {
- throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
- withAddr("failed to set socket option"),
- errno);
- }
- }
-
- // Perform the connect()
- address.getAddress(&addrStorage);
-
- rv = ::connect(fd_, saddr, address.getActualSize());
- if (rv < 0) {
- if (errno == EINPROGRESS) {
- // Connection in progress.
- if (timeout > 0) {
- // Start a timer in case the connection takes too long.
- if (!writeTimeout_.scheduleTimeout(timeout)) {
- throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
- withAddr("failed to schedule AsyncSocket connect timeout"));
- }
- }
-
- // Register for write events, so we'll
- // be notified when the connection finishes/fails.
- // Note that we don't register for a persistent event here.
- assert(eventFlags_ == EventHandler::NONE);
- eventFlags_ = EventHandler::WRITE;
- if (!ioHandler_.registerHandler(eventFlags_)) {
- throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
- withAddr("failed to register AsyncSocket connect handler"));
- }
- return;
- } else {
- throw AsyncSocketException(AsyncSocketException::NOT_OPEN,
- "connect failed (immediately)", errno);
- }
- }
-
- // If we're still here the connect() succeeded immediately.
- // Fall through to call the callback outside of this try...catch block
- } catch (const AsyncSocketException& ex) {
- return failConnect(__func__, ex);
- } catch (const std::exception& ex) {
- // shouldn't happen, but handle it just in case
- VLOG(4) << "AsyncSocket::connect(this=" << this << ", fd=" << fd_
- << "): unexpected " << typeid(ex).name() << " exception: "
- << ex.what();
- AsyncSocketException tex(AsyncSocketException::INTERNAL_ERROR,
- withAddr(string("unexpected exception: ") +
- ex.what()));
- return failConnect(__func__, tex);
- }
-
- // The connection succeeded immediately
- // The read callback may not have been set yet, and no writes may be pending
- // yet, so we don't have to register for any events at the moment.
- VLOG(8) << "AsyncSocket::connect succeeded immediately; this=" << this;
- assert(readCallback_ == nullptr);
- assert(writeReqHead_ == nullptr);
- state_ = StateEnum::ESTABLISHED;
- if (callback) {
- connectCallback_ = nullptr;
- callback->connectSuccess();
- }
-}
-
-void AsyncSocket::connect(ConnectCallback* callback,
- const string& ip, uint16_t port,
- int timeout,
- const OptionMap &options) noexcept {
- DestructorGuard dg(this);
- try {
- connectCallback_ = callback;
- connect(callback, folly::SocketAddress(ip, port), timeout, options);
- } catch (const std::exception& ex) {
- AsyncSocketException tex(AsyncSocketException::INTERNAL_ERROR,
- ex.what());
- return failConnect(__func__, tex);
- }
-}
-
-void AsyncSocket::setSendTimeout(uint32_t milliseconds) {
- sendTimeout_ = milliseconds;
- assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
-
- // If we are currently pending on write requests, immediately update
- // writeTimeout_ with the new value.
- if ((eventFlags_ & EventHandler::WRITE) &&
- (state_ != StateEnum::CONNECTING)) {
- assert(state_ == StateEnum::ESTABLISHED);
- assert((shutdownFlags_ & SHUT_WRITE) == 0);
- if (sendTimeout_ > 0) {
- if (!writeTimeout_.scheduleTimeout(sendTimeout_)) {
- AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
- withAddr("failed to reschedule send timeout in setSendTimeout"));
- return failWrite(__func__, ex);
- }
- } else {
- writeTimeout_.cancelTimeout();
- }
- }
-}
-
-void AsyncSocket::setReadCB(ReadCallback *callback) {
- VLOG(6) << "AsyncSocket::setReadCallback() this=" << this << ", fd=" << fd_
- << ", callback=" << callback << ", state=" << state_;
-
- // Short circuit if callback is the same as the existing readCallback_.
- //
- // Note that this is needed for proper functioning during some cleanup cases.
- // During cleanup we allow setReadCallback(nullptr) to be called even if the
- // read callback is already unset and we have been detached from an event
- // base. This check prevents us from asserting
- // eventBase_->isInEventBaseThread() when eventBase_ is nullptr.
- if (callback == readCallback_) {
- return;
- }
-
- if (shutdownFlags_ & SHUT_READ) {
- // Reads have already been shut down on this socket.
- //
- // Allow setReadCallback(nullptr) to be called in this case, but don't
- // allow a new callback to be set.
- //
- // For example, setReadCallback(nullptr) can happen after an error if we
- // invoke some other error callback before invoking readError(). The other
- // error callback that is invoked first may go ahead and clear the read
- // callback before we get a chance to invoke readError().
- if (callback != nullptr) {
- return invalidState(callback);
- }
- assert((eventFlags_ & EventHandler::READ) == 0);
- readCallback_ = nullptr;
- return;
- }
-
- DestructorGuard dg(this);
- assert(eventBase_->isInEventBaseThread());
-
- switch ((StateEnum)state_) {
- case StateEnum::CONNECTING:
- // For convenience, we allow the read callback to be set while we are
- // still connecting. We just store the callback for now. Once the
- // connection completes we'll register for read events.
- readCallback_ = callback;
- return;
- case StateEnum::ESTABLISHED:
- {
- readCallback_ = callback;
- uint16_t oldFlags = eventFlags_;
- if (readCallback_) {
- eventFlags_ |= EventHandler::READ;
- } else {
- eventFlags_ &= ~EventHandler::READ;
- }
-
- // Update our registration if our flags have changed
- if (eventFlags_ != oldFlags) {
- // We intentionally ignore the return value here.
- // updateEventRegistration() will move us into the error state if it
- // fails, and we don't need to do anything else here afterwards.
- (void)updateEventRegistration();
- }
-
- if (readCallback_) {
- checkForImmediateRead();
- }
- return;
- }
- case StateEnum::CLOSED:
- case StateEnum::ERROR:
- // We should never reach here. SHUT_READ should always be set
- // if we are in STATE_CLOSED or STATE_ERROR.
- assert(false);
- return invalidState(callback);
- case StateEnum::UNINIT:
- // We do not allow setReadCallback() to be called before we start
- // connecting.
- return invalidState(callback);
- }
-
- // We don't put a default case in the switch statement, so that the compiler
- // will warn us to update the switch statement if a new state is added.
- return invalidState(callback);
-}
-
-AsyncSocket::ReadCallback* AsyncSocket::getReadCallback() const {
- return readCallback_;
-}
-
-void AsyncSocket::write(WriteCallback* callback,
- const void* buf, size_t bytes, WriteFlags flags) {
- iovec op;
- op.iov_base = const_cast<void*>(buf);
- op.iov_len = bytes;
- writeImpl(callback, &op, 1, std::move(unique_ptr<IOBuf>()), flags);
-}
-
-void AsyncSocket::writev(WriteCallback* callback,
- const iovec* vec,
- size_t count,
- WriteFlags flags) {
- writeImpl(callback, vec, count, std::move(unique_ptr<IOBuf>()), flags);
-}
-
-void AsyncSocket::writeChain(WriteCallback* callback, unique_ptr<IOBuf>&& buf,
- WriteFlags flags) {
- size_t count = buf->countChainElements();
- if (count <= 64) {
- iovec vec[count];
- writeChainImpl(callback, vec, count, std::move(buf), flags);
- } else {
- iovec* vec = new iovec[count];
- writeChainImpl(callback, vec, count, std::move(buf), flags);
- delete[] vec;
- }
-}
-
-void AsyncSocket::writeChainImpl(WriteCallback* callback, iovec* vec,
- size_t count, unique_ptr<IOBuf>&& buf, WriteFlags flags) {
- const IOBuf* head = buf.get();
- const IOBuf* next = head;
- unsigned i = 0;
- do {
- vec[i].iov_base = const_cast<uint8_t *>(next->data());
- vec[i].iov_len = next->length();
- // IOBuf can get confused by empty iovec buffers, so increment the
- // output pointer only if the iovec buffer is non-empty. We could
- // end the loop with i < count, but that's ok.
- if (vec[i].iov_len != 0) {
- i++;
- }
- next = next->next();
- } while (next != head);
- writeImpl(callback, vec, i, std::move(buf), flags);
-}
-
-void AsyncSocket::writeImpl(WriteCallback* callback, const iovec* vec,
- size_t count, unique_ptr<IOBuf>&& buf,
- WriteFlags flags) {
- VLOG(6) << "AsyncSocket::writev() this=" << this << ", fd=" << fd_
- << ", callback=" << callback << ", count=" << count
- << ", state=" << state_;
- DestructorGuard dg(this);
- unique_ptr<IOBuf>ioBuf(std::move(buf));
- assert(eventBase_->isInEventBaseThread());
-
- if (shutdownFlags_ & (SHUT_WRITE | SHUT_WRITE_PENDING)) {
- // No new writes may be performed after the write side of the socket has
- // been shutdown.
- //
- // We could just call callback->writeError() here to fail just this write.
- // However, fail hard and use invalidState() to fail all outstanding
- // callbacks and move the socket into the error state. There's most likely
- // a bug in the caller's code, so we abort everything rather than trying to
- // proceed as best we can.
- return invalidState(callback);
- }
-
- uint32_t countWritten = 0;
- uint32_t partialWritten = 0;
- int bytesWritten = 0;
- bool mustRegister = false;
- if (state_ == StateEnum::ESTABLISHED && !connecting()) {
- if (writeReqHead_ == nullptr) {
- // If we are established and there are no other writes pending,
- // we can attempt to perform the write immediately.
- assert(writeReqTail_ == nullptr);
- assert((eventFlags_ & EventHandler::WRITE) == 0);
-
- bytesWritten = performWrite(vec, count, flags,
- &countWritten, &partialWritten);
- if (bytesWritten < 0) {
- AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
- withAddr("writev failed"), errno);
- return failWrite(__func__, callback, 0, ex);
- } else if (countWritten == count) {
- // We successfully wrote everything.
- // Invoke the callback and return.
- if (callback) {
- callback->writeSuccess();
- }
- return;
- } // else { continue writing the next writeReq }
- mustRegister = true;
- }
- } else if (!connecting()) {
- // Invalid state for writing
- return invalidState(callback);
- }
-
- // Create a new WriteRequest to add to the queue
- WriteRequest* req;
- try {
- req = WriteRequest::newRequest(callback, vec + countWritten,
- count - countWritten, std::move(ioBuf),
- flags);
- } catch (const std::exception& ex) {
- // we mainly expect to catch std::bad_alloc here
- AsyncSocketException tex(AsyncSocketException::INTERNAL_ERROR,
- withAddr(string("failed to append new WriteRequest: ") + ex.what()));
- return failWrite(__func__, callback, bytesWritten, tex);
- }
- req->consume(0, partialWritten, bytesWritten);
- if (writeReqTail_ == nullptr) {
- assert(writeReqHead_ == nullptr);
- writeReqHead_ = writeReqTail_ = req;
- } else {
- writeReqTail_->append(req);
- writeReqTail_ = req;
- }
-
- // Register for write events if are established and not currently
- // waiting on write events
- if (mustRegister) {
- assert(state_ == StateEnum::ESTABLISHED);
- assert((eventFlags_ & EventHandler::WRITE) == 0);
- if (!updateEventRegistration(EventHandler::WRITE, 0)) {
- assert(state_ == StateEnum::ERROR);
- return;
- }
- if (sendTimeout_ > 0) {
- // Schedule a timeout to fire if the write takes too long.
- if (!writeTimeout_.scheduleTimeout(sendTimeout_)) {
- AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
- withAddr("failed to schedule send timeout"));
- return failWrite(__func__, ex);
- }
- }
- }
-}
-
-void AsyncSocket::close() {
- VLOG(5) << "AsyncSocket::close(): this=" << this << ", fd_=" << fd_
- << ", state=" << state_ << ", shutdownFlags="
- << std::hex << (int) shutdownFlags_;
-
- // close() is only different from closeNow() when there are pending writes
- // that need to drain before we can close. In all other cases, just call
- // closeNow().
- //
- // Note that writeReqHead_ can be non-nullptr even in STATE_CLOSED or
- // STATE_ERROR if close() is invoked while a previous closeNow() or failure
- // is still running. (e.g., If there are multiple pending writes, and we
- // call writeError() on the first one, it may call close(). In this case we
- // will already be in STATE_CLOSED or STATE_ERROR, but the remaining pending
- // writes will still be in the queue.)
- //
- // We only need to drain pending writes if we are still in STATE_CONNECTING
- // or STATE_ESTABLISHED
- if ((writeReqHead_ == nullptr) ||
- !(state_ == StateEnum::CONNECTING ||
- state_ == StateEnum::ESTABLISHED)) {
- closeNow();
- return;
- }
-
- // Declare a DestructorGuard to ensure that the AsyncSocket cannot be
- // destroyed until close() returns.
- DestructorGuard dg(this);
- assert(eventBase_->isInEventBaseThread());
-
- // Since there are write requests pending, we have to set the
- // SHUT_WRITE_PENDING flag, and wait to perform the real close until the
- // connect finishes and we finish writing these requests.
- //
- // Set SHUT_READ to indicate that reads are shut down, and set the
- // SHUT_WRITE_PENDING flag to mark that we want to shutdown once the
- // pending writes complete.
- shutdownFlags_ |= (SHUT_READ | SHUT_WRITE_PENDING);
-
- // If a read callback is set, invoke readEOF() immediately to inform it that
- // the socket has been closed and no more data can be read.
- if (readCallback_) {
- // Disable reads if they are enabled
- if (!updateEventRegistration(0, EventHandler::READ)) {
- // We're now in the error state; callbacks have been cleaned up
- assert(state_ == StateEnum::ERROR);
- assert(readCallback_ == nullptr);
- } else {
- ReadCallback* callback = readCallback_;
- readCallback_ = nullptr;
- callback->readEOF();
- }
- }
-}
-
-void AsyncSocket::closeNow() {
- VLOG(5) << "AsyncSocket::closeNow(): this=" << this << ", fd_=" << fd_
- << ", state=" << state_ << ", shutdownFlags="
- << std::hex << (int) shutdownFlags_;
- DestructorGuard dg(this);
- assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
-
- switch (state_) {
- case StateEnum::ESTABLISHED:
- case StateEnum::CONNECTING:
- {
- shutdownFlags_ |= (SHUT_READ | SHUT_WRITE);
- state_ = StateEnum::CLOSED;
-
- // If the write timeout was set, cancel it.
- writeTimeout_.cancelTimeout();
-
- // If we are registered for I/O events, unregister.
- if (eventFlags_ != EventHandler::NONE) {
- eventFlags_ = EventHandler::NONE;
- if (!updateEventRegistration()) {
- // We will have been moved into the error state.
- assert(state_ == StateEnum::ERROR);
- return;
- }
- }
-
- if (fd_ >= 0) {
- ioHandler_.changeHandlerFD(-1);
- doClose();
- }
-
- if (connectCallback_) {
- ConnectCallback* callback = connectCallback_;
- connectCallback_ = nullptr;
- callback->connectErr(socketClosedLocallyEx);
- }
-
- failAllWrites(socketClosedLocallyEx);
-
- if (readCallback_) {
- ReadCallback* callback = readCallback_;
- readCallback_ = nullptr;
- callback->readEOF();
- }
- return;
- }
- case StateEnum::CLOSED:
- // Do nothing. It's possible that we are being called recursively
- // from inside a callback that we invoked inside another call to close()
- // that is still running.
- return;
- case StateEnum::ERROR:
- // Do nothing. The error handling code has performed (or is performing)
- // cleanup.
- return;
- case StateEnum::UNINIT:
- assert(eventFlags_ == EventHandler::NONE);
- assert(connectCallback_ == nullptr);
- assert(readCallback_ == nullptr);
- assert(writeReqHead_ == nullptr);
- shutdownFlags_ |= (SHUT_READ | SHUT_WRITE);
- state_ = StateEnum::CLOSED;
- return;
- }
-
- LOG(DFATAL) << "AsyncSocket::closeNow() (this=" << this << ", fd=" << fd_
- << ") called in unknown state " << state_;
-}
-
-void AsyncSocket::closeWithReset() {
- // Enable SO_LINGER, with the linger timeout set to 0.
- // This will trigger a TCP reset when we close the socket.
- if (fd_ >= 0) {
- struct linger optLinger = {1, 0};
- if (setSockOpt(SOL_SOCKET, SO_LINGER, &optLinger) != 0) {
- VLOG(2) << "AsyncSocket::closeWithReset(): error setting SO_LINGER "
- << "on " << fd_ << ": errno=" << errno;
- }
- }
-
- // Then let closeNow() take care of the rest
- closeNow();
-}
-
-void AsyncSocket::shutdownWrite() {
- VLOG(5) << "AsyncSocket::shutdownWrite(): this=" << this << ", fd=" << fd_
- << ", state=" << state_ << ", shutdownFlags="
- << std::hex << (int) shutdownFlags_;
-
- // If there are no pending writes, shutdownWrite() is identical to
- // shutdownWriteNow().
- if (writeReqHead_ == nullptr) {
- shutdownWriteNow();
- return;
- }
-
- assert(eventBase_->isInEventBaseThread());
-
- // There are pending writes. Set SHUT_WRITE_PENDING so that the actual
- // shutdown will be performed once all writes complete.
- shutdownFlags_ |= SHUT_WRITE_PENDING;
-}
-
-void AsyncSocket::shutdownWriteNow() {
- VLOG(5) << "AsyncSocket::shutdownWriteNow(): this=" << this
- << ", fd=" << fd_ << ", state=" << state_
- << ", shutdownFlags=" << std::hex << (int) shutdownFlags_;
-
- if (shutdownFlags_ & SHUT_WRITE) {
- // Writes are already shutdown; nothing else to do.
- return;
- }
-
- // If SHUT_READ is already set, just call closeNow() to completely
- // close the socket. This can happen if close() was called with writes
- // pending, and then shutdownWriteNow() is called before all pending writes
- // complete.
- if (shutdownFlags_ & SHUT_READ) {
- closeNow();
- return;
- }
-
- DestructorGuard dg(this);
- assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
-
- switch (static_cast<StateEnum>(state_)) {
- case StateEnum::ESTABLISHED:
- {
- shutdownFlags_ |= SHUT_WRITE;
-
- // If the write timeout was set, cancel it.
- writeTimeout_.cancelTimeout();
-
- // If we are registered for write events, unregister.
- if (!updateEventRegistration(0, EventHandler::WRITE)) {
- // We will have been moved into the error state.
- assert(state_ == StateEnum::ERROR);
- return;
- }
-
- // Shutdown writes on the file descriptor
- ::shutdown(fd_, SHUT_WR);
-
- // Immediately fail all write requests
- failAllWrites(socketShutdownForWritesEx);
- return;
- }
- case StateEnum::CONNECTING:
- {
- // Set the SHUT_WRITE_PENDING flag.
- // When the connection completes, it will check this flag,
- // shutdown the write half of the socket, and then set SHUT_WRITE.
- shutdownFlags_ |= SHUT_WRITE_PENDING;
-
- // Immediately fail all write requests
- failAllWrites(socketShutdownForWritesEx);
- return;
- }
- case StateEnum::UNINIT:
- // Callers normally shouldn't call shutdownWriteNow() before the socket
- // even starts connecting. Nonetheless, go ahead and set
- // SHUT_WRITE_PENDING. Once the socket eventually connects it will
- // immediately shut down the write side of the socket.
- shutdownFlags_ |= SHUT_WRITE_PENDING;
- return;
- case StateEnum::CLOSED:
- case StateEnum::ERROR:
- // We should never get here. SHUT_WRITE should always be set
- // in STATE_CLOSED and STATE_ERROR.
- VLOG(4) << "AsyncSocket::shutdownWriteNow() (this=" << this
- << ", fd=" << fd_ << ") in unexpected state " << state_
- << " with SHUT_WRITE not set ("
- << std::hex << (int) shutdownFlags_ << ")";
- assert(false);
- return;
- }
-
- LOG(DFATAL) << "AsyncSocket::shutdownWriteNow() (this=" << this << ", fd="
- << fd_ << ") called in unknown state " << state_;
-}
-
-bool AsyncSocket::readable() const {
- if (fd_ == -1) {
- return false;
- }
- struct pollfd fds[1];
- fds[0].fd = fd_;
- fds[0].events = POLLIN;
- fds[0].revents = 0;
- int rc = poll(fds, 1, 0);
- return rc == 1;
-}
-
-bool AsyncSocket::isPending() const {
- return ioHandler_.isPending();
-}
-
-bool AsyncSocket::hangup() const {
- if (fd_ == -1) {
- // sanity check, no one should ask for hangup if we are not connected.
- assert(false);
- return false;
- }
-#ifdef POLLRDHUP // Linux-only
- struct pollfd fds[1];
- fds[0].fd = fd_;
- fds[0].events = POLLRDHUP|POLLHUP;
- fds[0].revents = 0;
- poll(fds, 1, 0);
- return (fds[0].revents & (POLLRDHUP|POLLHUP)) != 0;
-#else
- return false;
-#endif
-}
-
-bool AsyncSocket::good() const {
- return ((state_ == StateEnum::CONNECTING ||
- state_ == StateEnum::ESTABLISHED) &&
- (shutdownFlags_ == 0) && (eventBase_ != nullptr));
-}
-
-bool AsyncSocket::error() const {
- return (state_ == StateEnum::ERROR);
-}
-
-void AsyncSocket::attachEventBase(EventBase* eventBase) {
- VLOG(5) << "AsyncSocket::attachEventBase(this=" << this << ", fd=" << fd_
- << ", old evb=" << eventBase_ << ", new evb=" << eventBase
- << ", state=" << state_ << ", events="
- << std::hex << eventFlags_ << ")";
- assert(eventBase_ == nullptr);
- assert(eventBase->isInEventBaseThread());
-
- eventBase_ = eventBase;
- ioHandler_.attachEventBase(eventBase);
- writeTimeout_.attachEventBase(eventBase);
-}
-
-void AsyncSocket::detachEventBase() {
- VLOG(5) << "AsyncSocket::detachEventBase(this=" << this << ", fd=" << fd_
- << ", old evb=" << eventBase_ << ", state=" << state_
- << ", events=" << std::hex << eventFlags_ << ")";
- assert(eventBase_ != nullptr);
- assert(eventBase_->isInEventBaseThread());
-
- eventBase_ = nullptr;
- ioHandler_.detachEventBase();
- writeTimeout_.detachEventBase();
-}
-
-bool AsyncSocket::isDetachable() const {
- DCHECK(eventBase_ != nullptr);
- DCHECK(eventBase_->isInEventBaseThread());
-
- return !ioHandler_.isHandlerRegistered() && !writeTimeout_.isScheduled();
-}
-
-void AsyncSocket::getLocalAddress(folly::SocketAddress* address) const {
- address->setFromLocalAddress(fd_);
-}
-
-void AsyncSocket::getPeerAddress(folly::SocketAddress* address) const {
- if (!addr_.isInitialized()) {
- addr_.setFromPeerAddress(fd_);
- }
- *address = addr_;
-}
-
-int AsyncSocket::setNoDelay(bool noDelay) {
- if (fd_ < 0) {
- VLOG(4) << "AsyncSocket::setNoDelay() called on non-open socket "
- << this << "(state=" << state_ << ")";
- return EINVAL;
-
- }
-
- int value = noDelay ? 1 : 0;
- if (setsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &value, sizeof(value)) != 0) {
- int errnoCopy = errno;
- VLOG(2) << "failed to update TCP_NODELAY option on AsyncSocket "
- << this << " (fd=" << fd_ << ", state=" << state_ << "): "
- << strerror(errnoCopy);
- return errnoCopy;
- }
-
- return 0;
-}
-
-int AsyncSocket::setCongestionFlavor(const std::string &cname) {
-
- #ifndef TCP_CONGESTION
- #define TCP_CONGESTION 13
- #endif
-
- if (fd_ < 0) {
- VLOG(4) << "AsyncSocket::setCongestionFlavor() called on non-open "
- << "socket " << this << "(state=" << state_ << ")";
- return EINVAL;
-
- }
-
- if (setsockopt(fd_, IPPROTO_TCP, TCP_CONGESTION, cname.c_str(),
- cname.length() + 1) != 0) {
- int errnoCopy = errno;
- VLOG(2) << "failed to update TCP_CONGESTION option on AsyncSocket "
- << this << "(fd=" << fd_ << ", state=" << state_ << "): "
- << strerror(errnoCopy);
- return errnoCopy;
- }
-
- return 0;
-}
-
-int AsyncSocket::setQuickAck(bool quickack) {
- if (fd_ < 0) {
- VLOG(4) << "AsyncSocket::setQuickAck() called on non-open socket "
- << this << "(state=" << state_ << ")";
- return EINVAL;
-
- }
-
-#ifdef TCP_QUICKACK // Linux-only
- int value = quickack ? 1 : 0;
- if (setsockopt(fd_, IPPROTO_TCP, TCP_QUICKACK, &value, sizeof(value)) != 0) {
- int errnoCopy = errno;
- VLOG(2) << "failed to update TCP_QUICKACK option on AsyncSocket"
- << this << "(fd=" << fd_ << ", state=" << state_ << "): "
- << strerror(errnoCopy);
- return errnoCopy;
- }
-
- return 0;
-#else
- return ENOSYS;
-#endif
-}
-
-int AsyncSocket::setSendBufSize(size_t bufsize) {
- if (fd_ < 0) {
- VLOG(4) << "AsyncSocket::setSendBufSize() called on non-open socket "
- << this << "(state=" << state_ << ")";
- return EINVAL;
- }
-
- if (setsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &bufsize, sizeof(bufsize)) !=0) {
- int errnoCopy = errno;
- VLOG(2) << "failed to update SO_SNDBUF option on AsyncSocket"
- << this << "(fd=" << fd_ << ", state=" << state_ << "): "
- << strerror(errnoCopy);
- return errnoCopy;
- }
-
- return 0;
-}
-
-int AsyncSocket::setRecvBufSize(size_t bufsize) {
- if (fd_ < 0) {
- VLOG(4) << "AsyncSocket::setRecvBufSize() called on non-open socket "
- << this << "(state=" << state_ << ")";
- return EINVAL;
- }
-
- if (setsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &bufsize, sizeof(bufsize)) !=0) {
- int errnoCopy = errno;
- VLOG(2) << "failed to update SO_RCVBUF option on AsyncSocket"
- << this << "(fd=" << fd_ << ", state=" << state_ << "): "
- << strerror(errnoCopy);
- return errnoCopy;
- }
-
- return 0;
-}
-
-int AsyncSocket::setTCPProfile(int profd) {
- if (fd_ < 0) {
- VLOG(4) << "AsyncSocket::setTCPProfile() called on non-open socket "
- << this << "(state=" << state_ << ")";
- return EINVAL;
- }
-
- if (setsockopt(fd_, SOL_SOCKET, SO_SET_NAMESPACE, &profd, sizeof(int)) !=0) {
- int errnoCopy = errno;
- VLOG(2) << "failed to set socket namespace option on AsyncSocket"
- << this << "(fd=" << fd_ << ", state=" << state_ << "): "
- << strerror(errnoCopy);
- return errnoCopy;
- }
-
- return 0;
-}
-
-void AsyncSocket::ioReady(uint16_t events) noexcept {
- VLOG(7) << "AsyncSocket::ioRead() this=" << this << ", fd" << fd_
- << ", events=" << std::hex << events << ", state=" << state_;
- DestructorGuard dg(this);
- assert(events & EventHandler::READ_WRITE);
- assert(eventBase_->isInEventBaseThread());
-
- uint16_t relevantEvents = events & EventHandler::READ_WRITE;
- if (relevantEvents == EventHandler::READ) {
- handleRead();
- } else if (relevantEvents == EventHandler::WRITE) {
- handleWrite();
- } else if (relevantEvents == EventHandler::READ_WRITE) {
- EventBase* originalEventBase = eventBase_;
- // If both read and write events are ready, process writes first.
- handleWrite();
-
- // Return now if handleWrite() detached us from our EventBase
- if (eventBase_ != originalEventBase) {
- return;
- }
-
- // Only call handleRead() if a read callback is still installed.
- // (It's possible that the read callback was uninstalled during
- // handleWrite().)
- if (readCallback_) {
- handleRead();
- }
- } else {
- VLOG(4) << "AsyncSocket::ioRead() called with unexpected events "
- << std::hex << events << "(this=" << this << ")";
- abort();
- }
-}
-
-ssize_t AsyncSocket::performRead(void* buf, size_t buflen) {
- ssize_t bytes = recv(fd_, buf, buflen, MSG_DONTWAIT);
- if (bytes < 0) {
- if (errno == EAGAIN || errno == EWOULDBLOCK) {
- // No more data to read right now.
- return READ_BLOCKING;
- } else {
- return READ_ERROR;
- }
- } else {
- appBytesReceived_ += bytes;
- return bytes;
- }
-}
-
-void AsyncSocket::handleRead() noexcept {
- VLOG(5) << "AsyncSocket::handleRead() this=" << this << ", fd=" << fd_
- << ", state=" << state_;
- assert(state_ == StateEnum::ESTABLISHED);
- assert((shutdownFlags_ & SHUT_READ) == 0);
- assert(readCallback_ != nullptr);
- assert(eventFlags_ & EventHandler::READ);
-
- // Loop until:
- // - a read attempt would block
- // - readCallback_ is uninstalled
- // - the number of loop iterations exceeds the optional maximum
- // - this AsyncSocket is moved to another EventBase
- //
- // When we invoke readDataAvailable() it may uninstall the readCallback_,
- // which is why need to check for it here.
- //
- // The last bullet point is slightly subtle. readDataAvailable() may also
- // detach this socket from this EventBase. However, before
- // readDataAvailable() returns another thread may pick it up, attach it to
- // a different EventBase, and install another readCallback_. We need to
- // exit immediately after readDataAvailable() returns if the eventBase_ has
- // changed. (The caller must perform some sort of locking to transfer the
- // AsyncSocket between threads properly. This will be sufficient to ensure
- // that this thread sees the updated eventBase_ variable after
- // readDataAvailable() returns.)
- uint16_t numReads = 0;
- EventBase* originalEventBase = eventBase_;
- while (readCallback_ && eventBase_ == originalEventBase) {
- // Get the buffer to read into.
- void* buf = nullptr;
- size_t buflen = 0;
- try {
- readCallback_->getReadBuffer(&buf, &buflen);
- } catch (const AsyncSocketException& ex) {
- return failRead(__func__, ex);
- } catch (const std::exception& ex) {
- AsyncSocketException tex(AsyncSocketException::BAD_ARGS,
- string("ReadCallback::getReadBuffer() "
- "threw exception: ") +
- ex.what());
- return failRead(__func__, tex);
- } catch (...) {
- AsyncSocketException ex(AsyncSocketException::BAD_ARGS,
- "ReadCallback::getReadBuffer() threw "
- "non-exception type");
- return failRead(__func__, ex);
- }
- if (buf == nullptr || buflen == 0) {
- AsyncSocketException ex(AsyncSocketException::BAD_ARGS,
- "ReadCallback::getReadBuffer() returned "
- "empty buffer");
- return failRead(__func__, ex);
- }
-
- // Perform the read
- ssize_t bytesRead = performRead(buf, buflen);
- if (bytesRead > 0) {
- readCallback_->readDataAvailable(bytesRead);
- // Fall through and continue around the loop if the read
- // completely filled the available buffer.
- // Note that readCallback_ may have been uninstalled or changed inside
- // readDataAvailable().
- if (bytesRead < buflen) {
- return;
- }
- } else if (bytesRead == READ_BLOCKING) {
- // No more data to read right now.
- return;
- } else if (bytesRead == READ_ERROR) {
- AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
- withAddr("recv() failed"), errno);
- return failRead(__func__, ex);
- } else {
- assert(bytesRead == READ_EOF);
- // EOF
- shutdownFlags_ |= SHUT_READ;
- if (!updateEventRegistration(0, EventHandler::READ)) {
- // we've already been moved into STATE_ERROR
- assert(state_ == StateEnum::ERROR);
- assert(readCallback_ == nullptr);
- return;
- }
-
- ReadCallback* callback = readCallback_;
- readCallback_ = nullptr;
- callback->readEOF();
- return;
- }
- if (maxReadsPerEvent_ && (++numReads >= maxReadsPerEvent_)) {
- return;
- }
- }
-}
-
-/**
- * This function attempts to write as much data as possible, until no more data
- * can be written.
- *
- * - If it sends all available data, it unregisters for write events, and stops
- * the writeTimeout_.
- *
- * - If not all of the data can be sent immediately, it reschedules
- * writeTimeout_ (if a non-zero timeout is set), and ensures the handler is
- * registered for write events.
- */
-void AsyncSocket::handleWrite() noexcept {
- VLOG(5) << "AsyncSocket::handleWrite() this=" << this << ", fd=" << fd_
- << ", state=" << state_;
- if (state_ == StateEnum::CONNECTING) {
- handleConnect();
- return;
- }
-
- // Normal write
- assert(state_ == StateEnum::ESTABLISHED);
- assert((shutdownFlags_ & SHUT_WRITE) == 0);
- assert(writeReqHead_ != nullptr);
-
- // Loop until we run out of write requests,
- // or until this socket is moved to another EventBase.
- // (See the comment in handleRead() explaining how this can happen.)
- EventBase* originalEventBase = eventBase_;
- while (writeReqHead_ != nullptr && eventBase_ == originalEventBase) {
- uint32_t countWritten;
- uint32_t partialWritten;
- WriteFlags writeFlags = writeReqHead_->flags();
- if (writeReqHead_->getNext() != nullptr) {
- writeFlags = writeFlags | WriteFlags::CORK;
- }
- int bytesWritten = performWrite(writeReqHead_->getOps(),
- writeReqHead_->getOpCount(),
- writeFlags, &countWritten, &partialWritten);
- if (bytesWritten < 0) {
- AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
- withAddr("writev() failed"), errno);
- return failWrite(__func__, ex);
- } else if (countWritten == writeReqHead_->getOpCount()) {
- // We finished this request
- WriteRequest* req = writeReqHead_;
- writeReqHead_ = req->getNext();
-
- if (writeReqHead_ == nullptr) {
- writeReqTail_ = nullptr;
- // This is the last write request.
- // Unregister for write events and cancel the send timer
- // before we invoke the callback. We have to update the state properly
- // before calling the callback, since it may want to detach us from
- // the EventBase.
- if (eventFlags_ & EventHandler::WRITE) {
- if (!updateEventRegistration(0, EventHandler::WRITE)) {
- assert(state_ == StateEnum::ERROR);
- return;
- }
- // Stop the send timeout
- writeTimeout_.cancelTimeout();
- }
- assert(!writeTimeout_.isScheduled());
-
- // If SHUT_WRITE_PENDING is set, we should shutdown the socket after
- // we finish sending the last write request.
- //
- // We have to do this before invoking writeSuccess(), since
- // writeSuccess() may detach us from our EventBase.
- if (shutdownFlags_ & SHUT_WRITE_PENDING) {
- assert(connectCallback_ == nullptr);
- shutdownFlags_ |= SHUT_WRITE;
-
- if (shutdownFlags_ & SHUT_READ) {
- // Reads have already been shutdown. Fully close the socket and
- // move to STATE_CLOSED.
- //
- // Note: This code currently moves us to STATE_CLOSED even if
- // close() hasn't ever been called. This can occur if we have
- // received EOF from the peer and shutdownWrite() has been called
- // locally. Should we bother staying in STATE_ESTABLISHED in this
- // case, until close() is actually called? I can't think of a
- // reason why we would need to do so. No other operations besides
- // calling close() or destroying the socket can be performed at
- // this point.
- assert(readCallback_ == nullptr);
- state_ = StateEnum::CLOSED;
- if (fd_ >= 0) {
- ioHandler_.changeHandlerFD(-1);
- doClose();
- }
- } else {
- // Reads are still enabled, so we are only doing a half-shutdown
- ::shutdown(fd_, SHUT_WR);
- }
- }
- }
-
- // Invoke the callback
- WriteCallback* callback = req->getCallback();
- req->destroy();
- if (callback) {
- callback->writeSuccess();
- }
- // We'll continue around the loop, trying to write another request
- } else {
- // Partial write.
- writeReqHead_->consume(countWritten, partialWritten, bytesWritten);
- // Stop after a partial write; it's highly likely that a subsequent write
- // attempt will just return EAGAIN.
- //
- // Ensure that we are registered for write events.
- if ((eventFlags_ & EventHandler::WRITE) == 0) {
- if (!updateEventRegistration(EventHandler::WRITE, 0)) {
- assert(state_ == StateEnum::ERROR);
- return;
- }
- }
-
- // Reschedule the send timeout, since we have made some write progress.
- if (sendTimeout_ > 0) {
- if (!writeTimeout_.scheduleTimeout(sendTimeout_)) {
- AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
- withAddr("failed to reschedule write timeout"));
- return failWrite(__func__, ex);
- }
- }
- return;
- }
- }
-}
-
-void AsyncSocket::checkForImmediateRead() noexcept {
- // We currently don't attempt to perform optimistic reads in AsyncSocket.
- // (However, note that some subclasses do override this method.)
- //
- // Simply calling handleRead() here would be bad, as this would call
- // readCallback_->getReadBuffer(), forcing the callback to allocate a read
- // buffer even though no data may be available. This would waste lots of
- // memory, since the buffer will sit around unused until the socket actually
- // becomes readable.
- //
- // Checking if the socket is readable now also seems like it would probably
- // be a pessimism. In most cases it probably wouldn't be readable, and we
- // would just waste an extra system call. Even if it is readable, waiting to
- // find out from libevent on the next event loop doesn't seem that bad.
-}
-
-void AsyncSocket::handleInitialReadWrite() noexcept {
- // Our callers should already be holding a DestructorGuard, but grab
- // one here just to make sure, in case one of our calling code paths ever
- // changes.
- DestructorGuard dg(this);
-
- // If we have a readCallback_, make sure we enable read events. We
- // may already be registered for reads if connectSuccess() set
- // the read calback.
- if (readCallback_ && !(eventFlags_ & EventHandler::READ)) {
- assert(state_ == StateEnum::ESTABLISHED);
- assert((shutdownFlags_ & SHUT_READ) == 0);
- if (!updateEventRegistration(EventHandler::READ, 0)) {
- assert(state_ == StateEnum::ERROR);
- return;
- }
- checkForImmediateRead();
- } else if (readCallback_ == nullptr) {
- // Unregister for read events.
- updateEventRegistration(0, EventHandler::READ);
- }
-
- // If we have write requests pending, try to send them immediately.
- // Since we just finished accepting, there is a very good chance that we can
- // write without blocking.
- //
- // However, we only process them if EventHandler::WRITE is not already set,
- // which means that we're already blocked on a write attempt. (This can
- // happen if connectSuccess() called write() before returning.)
- if (writeReqHead_ && !(eventFlags_ & EventHandler::WRITE)) {
- // Call handleWrite() to perform write processing.
- handleWrite();
- } else if (writeReqHead_ == nullptr) {
- // Unregister for write event.
- updateEventRegistration(0, EventHandler::WRITE);
- }
-}
-
-void AsyncSocket::handleConnect() noexcept {
- VLOG(5) << "AsyncSocket::handleConnect() this=" << this << ", fd=" << fd_
- << ", state=" << state_;
- assert(state_ == StateEnum::CONNECTING);
- // SHUT_WRITE can never be set while we are still connecting;
- // SHUT_WRITE_PENDING may be set, be we only set SHUT_WRITE once the connect
- // finishes
- assert((shutdownFlags_ & SHUT_WRITE) == 0);
-
- // In case we had a connect timeout, cancel the timeout
- writeTimeout_.cancelTimeout();
- // We don't use a persistent registration when waiting on a connect event,
- // so we have been automatically unregistered now. Update eventFlags_ to
- // reflect reality.
- assert(eventFlags_ == EventHandler::WRITE);
- eventFlags_ = EventHandler::NONE;
-
- // Call getsockopt() to check if the connect succeeded
- int error;
- socklen_t len = sizeof(error);
- int rv = getsockopt(fd_, SOL_SOCKET, SO_ERROR, &error, &len);
- if (rv != 0) {
- AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
- withAddr("error calling getsockopt() after connect"),
- errno);
- VLOG(4) << "AsyncSocket::handleConnect(this=" << this << ", fd="
- << fd_ << " host=" << addr_.describe()
- << ") exception:" << ex.what();
- return failConnect(__func__, ex);
- }
-
- if (error != 0) {
- AsyncSocketException ex(AsyncSocketException::NOT_OPEN,
- "connect failed", error);
- VLOG(1) << "AsyncSocket::handleConnect(this=" << this << ", fd="
- << fd_ << " host=" << addr_.describe()
- << ") exception: " << ex.what();
- return failConnect(__func__, ex);
- }
-
- // Move into STATE_ESTABLISHED
- state_ = StateEnum::ESTABLISHED;
-
- // If SHUT_WRITE_PENDING is set and we don't have any write requests to
- // perform, immediately shutdown the write half of the socket.
- if ((shutdownFlags_ & SHUT_WRITE_PENDING) && writeReqHead_ == nullptr) {
- // SHUT_READ shouldn't be set. If close() is called on the socket while we
- // are still connecting we just abort the connect rather than waiting for
- // it to complete.
- assert((shutdownFlags_ & SHUT_READ) == 0);
- ::shutdown(fd_, SHUT_WR);
- shutdownFlags_ |= SHUT_WRITE;
- }
-
- VLOG(7) << "AsyncSocket " << this << ": fd " << fd_
- << "successfully connected; state=" << state_;
-
- // Remember the EventBase we are attached to, before we start invoking any
- // callbacks (since the callbacks may call detachEventBase()).
- EventBase* originalEventBase = eventBase_;
-
- // Call the connect callback.
- if (connectCallback_) {
- ConnectCallback* callback = connectCallback_;
- connectCallback_ = nullptr;
- callback->connectSuccess();
- }
-
- // Note that the connect callback may have changed our state.
- // (set or unset the read callback, called write(), closed the socket, etc.)
- // The following code needs to handle these situations correctly.
- //
- // If the socket has been closed, readCallback_ and writeReqHead_ will
- // always be nullptr, so that will prevent us from trying to read or write.
- //
- // The main thing to check for is if eventBase_ is still originalEventBase.
- // If not, we have been detached from this event base, so we shouldn't
- // perform any more operations.
- if (eventBase_ != originalEventBase) {
- return;
- }
-
- handleInitialReadWrite();
-}
-
-void AsyncSocket::timeoutExpired() noexcept {
- VLOG(7) << "AsyncSocket " << this << ", fd " << fd_ << ": timeout expired: "
- << "state=" << state_ << ", events=" << std::hex << eventFlags_;
- DestructorGuard dg(this);
- assert(eventBase_->isInEventBaseThread());
-
- if (state_ == StateEnum::CONNECTING) {
- // connect() timed out
- // Unregister for I/O events.
- AsyncSocketException ex(AsyncSocketException::TIMED_OUT,
- "connect timed out");
- failConnect(__func__, ex);
- } else {
- // a normal write operation timed out
- assert(state_ == StateEnum::ESTABLISHED);
- AsyncSocketException ex(AsyncSocketException::TIMED_OUT, "write timed out");
- failWrite(__func__, ex);
- }
-}
-
-ssize_t AsyncSocket::performWrite(const iovec* vec,
- uint32_t count,
- WriteFlags flags,
- uint32_t* countWritten,
- uint32_t* partialWritten) {
- // We use sendmsg() instead of writev() so that we can pass in MSG_NOSIGNAL
- // We correctly handle EPIPE errors, so we never want to receive SIGPIPE
- // (since it may terminate the program if the main program doesn't explicitly
- // ignore it).
- struct msghdr msg;
- msg.msg_name = nullptr;
- msg.msg_namelen = 0;
- msg.msg_iov = const_cast<iovec *>(vec);
-#ifdef IOV_MAX // not defined on Android
- msg.msg_iovlen = std::min(count, (uint32_t)IOV_MAX);
-#else
- msg.msg_iovlen = std::min(count, (uint32_t)UIO_MAXIOV);
-#endif
- msg.msg_control = nullptr;
- msg.msg_controllen = 0;
- msg.msg_flags = 0;
-
- int msg_flags = MSG_DONTWAIT;
-
-#ifdef MSG_NOSIGNAL // Linux-only
- msg_flags |= MSG_NOSIGNAL;
- if (isSet(flags, WriteFlags::CORK)) {
- // MSG_MORE tells the kernel we have more data to send, so wait for us to
- // give it the rest of the data rather than immediately sending a partial
- // frame, even when TCP_NODELAY is enabled.
- msg_flags |= MSG_MORE;
- }
-#endif
- if (isSet(flags, WriteFlags::EOR)) {
- // marks that this is the last byte of a record (response)
- msg_flags |= MSG_EOR;
- }
- ssize_t totalWritten = ::sendmsg(fd_, &msg, msg_flags);
- if (totalWritten < 0) {
- if (errno == EAGAIN) {
- // TCP buffer is full; we can't write any more data right now.
- *countWritten = 0;
- *partialWritten = 0;
- return 0;
- }
- // error
- *countWritten = 0;
- *partialWritten = 0;
- return -1;
- }
-
- appBytesWritten_ += totalWritten;
-
- uint32_t bytesWritten;
- uint32_t n;
- for (bytesWritten = totalWritten, n = 0; n < count; ++n) {
- const iovec* v = vec + n;
- if (v->iov_len > bytesWritten) {
- // Partial write finished in the middle of this iovec
- *countWritten = n;
- *partialWritten = bytesWritten;
- return totalWritten;
- }
-
- bytesWritten -= v->iov_len;
- }
-
- assert(bytesWritten == 0);
- *countWritten = n;
- *partialWritten = 0;
- return totalWritten;
-}
-
-/**
- * Re-register the EventHandler after eventFlags_ has changed.
- *
- * If an error occurs, fail() is called to move the socket into the error state
- * and call all currently installed callbacks. After an error, the
- * AsyncSocket is completely unregistered.
- *
- * @return Returns true on succcess, or false on error.
- */
-bool AsyncSocket::updateEventRegistration() {
- VLOG(5) << "AsyncSocket::updateEventRegistration(this=" << this
- << ", fd=" << fd_ << ", evb=" << eventBase_ << ", state=" << state_
- << ", events=" << std::hex << eventFlags_;
- assert(eventBase_->isInEventBaseThread());
- if (eventFlags_ == EventHandler::NONE) {
- ioHandler_.unregisterHandler();
- return true;
- }
-
- // Always register for persistent events, so we don't have to re-register
- // after being called back.
- if (!ioHandler_.registerHandler(eventFlags_ | EventHandler::PERSIST)) {
- eventFlags_ = EventHandler::NONE; // we're not registered after error
- AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
- withAddr("failed to update AsyncSocket event registration"));
- fail("updateEventRegistration", ex);
- return false;
- }
-
- return true;
-}
-
-bool AsyncSocket::updateEventRegistration(uint16_t enable,
- uint16_t disable) {
- uint16_t oldFlags = eventFlags_;
- eventFlags_ |= enable;
- eventFlags_ &= ~disable;
- if (eventFlags_ == oldFlags) {
- return true;
- } else {
- return updateEventRegistration();
- }
-}
-
-void AsyncSocket::startFail() {
- // startFail() should only be called once
- assert(state_ != StateEnum::ERROR);
- assert(getDestructorGuardCount() > 0);
- state_ = StateEnum::ERROR;
- // Ensure that SHUT_READ and SHUT_WRITE are set,
- // so all future attempts to read or write will be rejected
- shutdownFlags_ |= (SHUT_READ | SHUT_WRITE);
-
- if (eventFlags_ != EventHandler::NONE) {
- eventFlags_ = EventHandler::NONE;
- ioHandler_.unregisterHandler();
- }
- writeTimeout_.cancelTimeout();
-
- if (fd_ >= 0) {
- ioHandler_.changeHandlerFD(-1);
- doClose();
- }
-}
-
-void AsyncSocket::finishFail() {
- assert(state_ == StateEnum::ERROR);
- assert(getDestructorGuardCount() > 0);
-
- AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
- withAddr("socket closing after error"));
- if (connectCallback_) {
- ConnectCallback* callback = connectCallback_;
- connectCallback_ = nullptr;
- callback->connectErr(ex);
- }
-
- failAllWrites(ex);
-
- if (readCallback_) {
- ReadCallback* callback = readCallback_;
- readCallback_ = nullptr;
- callback->readErr(ex);
- }
-}
-
-void AsyncSocket::fail(const char* fn, const AsyncSocketException& ex) {
- VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
- << state_ << " host=" << addr_.describe()
- << "): failed in " << fn << "(): "
- << ex.what();
- startFail();
- finishFail();
-}
-
-void AsyncSocket::failConnect(const char* fn, const AsyncSocketException& ex) {
- VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
- << state_ << " host=" << addr_.describe()
- << "): failed while connecting in " << fn << "(): "
- << ex.what();
- startFail();
-
- if (connectCallback_ != nullptr) {
- ConnectCallback* callback = connectCallback_;
- connectCallback_ = nullptr;
- callback->connectErr(ex);
- }
-
- finishFail();
-}
-
-void AsyncSocket::failRead(const char* fn, const AsyncSocketException& ex) {
- VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
- << state_ << " host=" << addr_.describe()
- << "): failed while reading in " << fn << "(): "
- << ex.what();
- startFail();
-
- if (readCallback_ != nullptr) {
- ReadCallback* callback = readCallback_;
- readCallback_ = nullptr;
- callback->readErr(ex);
- }
-
- finishFail();
-}
-
-void AsyncSocket::failWrite(const char* fn, const AsyncSocketException& ex) {
- VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
- << state_ << " host=" << addr_.describe()
- << "): failed while writing in " << fn << "(): "
- << ex.what();
- startFail();
-
- // Only invoke the first write callback, since the error occurred while
- // writing this request. Let any other pending write callbacks be invoked in
- // finishFail().
- if (writeReqHead_ != nullptr) {
- WriteRequest* req = writeReqHead_;
- writeReqHead_ = req->getNext();
- WriteCallback* callback = req->getCallback();
- uint32_t bytesWritten = req->getBytesWritten();
- req->destroy();
- if (callback) {
- callback->writeErr(bytesWritten, ex);
- }
- }
-
- finishFail();
-}
-
-void AsyncSocket::failWrite(const char* fn, WriteCallback* callback,
- size_t bytesWritten,
- const AsyncSocketException& ex) {
- // This version of failWrite() is used when the failure occurs before
- // we've added the callback to writeReqHead_.
- VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
- << state_ << " host=" << addr_.describe()
- <<"): failed while writing in " << fn << "(): "
- << ex.what();
- startFail();
-
- if (callback != nullptr) {
- callback->writeErr(bytesWritten, ex);
- }
-
- finishFail();
-}
-
-void AsyncSocket::failAllWrites(const AsyncSocketException& ex) {
- // Invoke writeError() on all write callbacks.
- // This is used when writes are forcibly shutdown with write requests
- // pending, or when an error occurs with writes pending.
- while (writeReqHead_ != nullptr) {
- WriteRequest* req = writeReqHead_;
- writeReqHead_ = req->getNext();
- WriteCallback* callback = req->getCallback();
- if (callback) {
- callback->writeErr(req->getBytesWritten(), ex);
- }
- req->destroy();
- }
-}
-
-void AsyncSocket::invalidState(ConnectCallback* callback) {
- VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_
- << "): connect() called in invalid state " << state_;
-
- /*
- * The invalidState() methods don't use the normal failure mechanisms,
- * since we don't know what state we are in. We don't want to call
- * startFail()/finishFail() recursively if we are already in the middle of
- * cleaning up.
- */
-
- AsyncSocketException ex(AsyncSocketException::ALREADY_OPEN,
- "connect() called with socket in invalid state");
- if (state_ == StateEnum::CLOSED || state_ == StateEnum::ERROR) {
- if (callback) {
- callback->connectErr(ex);
- }
- } else {
- // We can't use failConnect() here since connectCallback_
- // may already be set to another callback. Invoke this ConnectCallback
- // here; any other connectCallback_ will be invoked in finishFail()
- startFail();
- if (callback) {
- callback->connectErr(ex);
- }
- finishFail();
- }
-}
-
-void AsyncSocket::invalidState(ReadCallback* callback) {
- VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_
- << "): setReadCallback(" << callback
- << ") called in invalid state " << state_;
-
- AsyncSocketException ex(AsyncSocketException::NOT_OPEN,
- "setReadCallback() called with socket in "
- "invalid state");
- if (state_ == StateEnum::CLOSED || state_ == StateEnum::ERROR) {
- if (callback) {
- callback->readErr(ex);
- }
- } else {
- startFail();
- if (callback) {
- callback->readErr(ex);
- }
- finishFail();
- }
-}
-
-void AsyncSocket::invalidState(WriteCallback* callback) {
- VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_
- << "): write() called in invalid state " << state_;
-
- AsyncSocketException ex(AsyncSocketException::NOT_OPEN,
- withAddr("write() called with socket in invalid state"));
- if (state_ == StateEnum::CLOSED || state_ == StateEnum::ERROR) {
- if (callback) {
- callback->writeErr(0, ex);
- }
- } else {
- startFail();
- if (callback) {
- callback->writeErr(0, ex);
- }
- finishFail();
- }
-}
-
-void AsyncSocket::doClose() {
- if (fd_ == -1) return;
- if (shutdownSocketSet_) {
- shutdownSocketSet_->close(fd_);
- } else {
- ::close(fd_);
- }
- fd_ = -1;
-}
-
-std::ostream& operator << (std::ostream& os,
- const AsyncSocket::StateEnum& state) {
- os << static_cast<int>(state);
- return os;
-}
-
-std::string AsyncSocket::withAddr(const std::string& s) {
- // Don't use addr_ directly because it may not be initialized
- // e.g. if constructed from fd
- folly::SocketAddress peer, local;
- try {
- getPeerAddress(&peer);
- getLocalAddress(&local);
- } catch (const std::exception&) {
- // ignore
- } catch (...) {
- // ignore
- }
- return s + " (peer=" + peer.describe() + ", local=" + local.describe() + ")";
-}
-
-} // folly
+++ /dev/null
-/*
- * Copyright 2014 Facebook, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-#include <sys/types.h>
-#include <sys/socket.h>
-#include <glog/logging.h>
-#include <folly/SocketAddress.h>
-#include <folly/io/ShutdownSocketSet.h>
-#include <folly/io/IOBuf.h>
-#include <folly/io/async/AsyncTimeout.h>
-#include <folly/io/async/AsyncSocketException.h>
-#include <folly/io/async/AsyncTransport.h>
-#include <folly/io/async/EventHandler.h>
-#include <folly/io/async/DelayedDestruction.h>
-
-#include <memory>
-#include <map>
-
-namespace folly {
-
-/**
- * A class for performing asynchronous I/O on a socket.
- *
- * AsyncSocket allows users to asynchronously wait for data on a socket, and
- * to asynchronously send data.
- *
- * The APIs for reading and writing are intentionally asymmetric. Waiting for
- * data to read is a persistent API: a callback is installed, and is notified
- * whenever new data is available. It continues to be notified of new events
- * until it is uninstalled.
- *
- * AsyncSocket does not provide read timeout functionality, because it
- * typically cannot determine when the timeout should be active. Generally, a
- * timeout should only be enabled when processing is blocked waiting on data
- * from the remote endpoint. For server sockets, the timeout should not be
- * active if the server is currently processing one or more outstanding
- * requests for this socket. For client sockets, the timeout should not be
- * active if there are no requests pending on the socket. Additionally, if a
- * client has multiple pending requests, it will ususally want a separate
- * timeout for each request, rather than a single read timeout.
- *
- * The write API is fairly intuitive: a user can request to send a block of
- * data, and a callback will be informed once the entire block has been
- * transferred to the kernel, or on error. AsyncSocket does provide a send
- * timeout, since most callers want to give up if the remote end stops
- * responding and no further progress can be made sending the data.
- */
-
-class AsyncSocket : virtual public AsyncTransport {
- public:
- typedef std::unique_ptr<AsyncSocket, Destructor> UniquePtr;
-
- class ConnectCallback {
- public:
- virtual ~ConnectCallback() {}
-
- /**
- * connectSuccess() will be invoked when the connection has been
- * successfully established.
- */
- virtual void connectSuccess() noexcept = 0;
-
- /**
- * connectErr() will be invoked if the connection attempt fails.
- *
- * @param ex An exception describing the error that occurred.
- */
- virtual void connectErr(const AsyncSocketException& ex)
- noexcept = 0;
- };
-
- class ReadCallback {
- public:
- virtual ~ReadCallback() {}
-
- /**
- * When data becomes available, getReadBuffer() will be invoked to get the
- * buffer into which data should be read.
- *
- * This method allows the ReadCallback to delay buffer allocation until
- * data becomes available. This allows applications to manage large
- * numbers of idle connections, without having to maintain a separate read
- * buffer for each idle connection.
- *
- * It is possible that in some cases, getReadBuffer() may be called
- * multiple times before readDataAvailable() is invoked. In this case, the
- * data will be written to the buffer returned from the most recent call to
- * readDataAvailable(). If the previous calls to readDataAvailable()
- * returned different buffers, the ReadCallback is responsible for ensuring
- * that they are not leaked.
- *
- * If getReadBuffer() throws an exception, returns a nullptr buffer, or
- * returns a 0 length, the ReadCallback will be uninstalled and its
- * readError() method will be invoked.
- *
- * getReadBuffer() is not allowed to change the transport state before it
- * returns. (For example, it should never uninstall the read callback, or
- * set a different read callback.)
- *
- * @param bufReturn getReadBuffer() should update *bufReturn to contain the
- * address of the read buffer. This parameter will never
- * be nullptr.
- * @param lenReturn getReadBuffer() should update *lenReturn to contain the
- * maximum number of bytes that may be written to the read
- * buffer. This parameter will never be nullptr.
- */
- virtual void getReadBuffer(void** bufReturn, size_t* lenReturn) = 0;
-
- /**
- * readDataAvailable() will be invoked when data has been successfully read
- * into the buffer returned by the last call to getReadBuffer().
- *
- * The read callback remains installed after readDataAvailable() returns.
- * It must be explicitly uninstalled to stop receiving read events.
- * getReadBuffer() will be called at least once before each call to
- * readDataAvailable(). getReadBuffer() will also be called before any
- * call to readEOF().
- *
- * @param len The number of bytes placed in the buffer.
- */
- virtual void readDataAvailable(size_t len) noexcept = 0;
-
- /**
- * readEOF() will be invoked when the transport is closed.
- *
- * The read callback will be automatically uninstalled immediately before
- * readEOF() is invoked.
- */
- virtual void readEOF() noexcept = 0;
-
- /**
- * readError() will be invoked if an error occurs reading from the
- * transport.
- *
- * The read callback will be automatically uninstalled immediately before
- * readError() is invoked.
- *
- * @param ex An exception describing the error that occurred.
- */
- virtual void readErr(const AsyncSocketException& ex)
- noexcept = 0;
- };
-
- class WriteCallback {
- public:
- virtual ~WriteCallback() {}
-
- /**
- * writeSuccess() will be invoked when all of the data has been
- * successfully written.
- *
- * Note that this mainly signals that the buffer containing the data to
- * write is no longer needed and may be freed or re-used. It does not
- * guarantee that the data has been fully transmitted to the remote
- * endpoint. For example, on socket-based transports, writeSuccess() only
- * indicates that the data has been given to the kernel for eventual
- * transmission.
- */
- virtual void writeSuccess() noexcept = 0;
-
- /**
- * writeError() will be invoked if an error occurs writing the data.
- *
- * @param bytesWritten The number of bytes that were successfull
- * @param ex An exception describing the error that occurred.
- */
- virtual void writeErr(size_t bytesWritten,
- const AsyncSocketException& ex)
- noexcept = 0;
- };
-
- /**
- * Create a new unconnected AsyncSocket.
- *
- * connect() must later be called on this socket to establish a connection.
- */
- explicit AsyncSocket(EventBase* evb);
-
- void setShutdownSocketSet(ShutdownSocketSet* ss);
-
- /**
- * Create a new AsyncSocket and begin the connection process.
- *
- * @param evb EventBase that will manage this socket.
- * @param address The address to connect to.
- * @param connectTimeout Optional timeout in milliseconds for the connection
- * attempt.
- */
- AsyncSocket(EventBase* evb,
- const folly::SocketAddress& address,
- uint32_t connectTimeout = 0);
-
- /**
- * Create a new AsyncSocket and begin the connection process.
- *
- * @param evb EventBase that will manage this socket.
- * @param ip IP address to connect to (dotted-quad).
- * @param port Destination port in host byte order.
- * @param connectTimeout Optional timeout in milliseconds for the connection
- * attempt.
- */
- AsyncSocket(EventBase* evb,
- const std::string& ip,
- uint16_t port,
- uint32_t connectTimeout = 0);
-
- /**
- * Create a AsyncSocket from an already connected socket file descriptor.
- *
- * Note that while AsyncSocket enables TCP_NODELAY for sockets it creates
- * when connecting, it does not change the socket options when given an
- * existing file descriptor. If callers want TCP_NODELAY enabled when using
- * this version of the constructor, they need to explicitly call
- * setNoDelay(true) after the constructor returns.
- *
- * @param evb EventBase that will manage this socket.
- * @param fd File descriptor to take over (should be a connected socket).
- */
- AsyncSocket(EventBase* evb, int fd);
-
- /**
- * Helper function to create a shared_ptr<AsyncSocket>.
- *
- * This passes in the correct destructor object, since AsyncSocket's
- * destructor is protected and cannot be invoked directly.
- */
- static std::shared_ptr<AsyncSocket> newSocket(EventBase* evb) {
- return std::shared_ptr<AsyncSocket>(new AsyncSocket(evb),
- Destructor());
- }
-
- /**
- * Helper function to create a shared_ptr<AsyncSocket>.
- */
- static std::shared_ptr<AsyncSocket> newSocket(
- EventBase* evb,
- const folly::SocketAddress& address,
- uint32_t connectTimeout = 0) {
- return std::shared_ptr<AsyncSocket>(
- new AsyncSocket(evb, address, connectTimeout),
- Destructor());
- }
-
- /**
- * Helper function to create a shared_ptr<AsyncSocket>.
- */
- static std::shared_ptr<AsyncSocket> newSocket(
- EventBase* evb,
- const std::string& ip,
- uint16_t port,
- uint32_t connectTimeout = 0) {
- return std::shared_ptr<AsyncSocket>(
- new AsyncSocket(evb, ip, port, connectTimeout),
- Destructor());
- }
-
- /**
- * Helper function to create a shared_ptr<AsyncSocket>.
- */
- static std::shared_ptr<AsyncSocket> newSocket(EventBase* evb, int fd) {
- return std::shared_ptr<AsyncSocket>(new AsyncSocket(evb, fd),
- Destructor());
- }
-
- /**
- * Destroy the socket.
- *
- * AsyncSocket::destroy() must be called to destroy the socket.
- * The normal destructor is private, and should not be invoked directly.
- * This prevents callers from deleting a AsyncSocket while it is invoking a
- * callback.
- */
- virtual void destroy();
-
- /**
- * Get the EventBase used by this socket.
- */
- EventBase* getEventBase() const override {
- return eventBase_;
- }
-
- /**
- * Get the file descriptor used by the AsyncSocket.
- */
- virtual int getFd() const {
- return fd_;
- }
-
- /**
- * Extract the file descriptor from the AsyncSocket.
- *
- * This will immediately cause any installed callbacks to be invoked with an
- * error. The AsyncSocket may no longer be used after the file descriptor
- * has been extracted.
- *
- * Returns the file descriptor. The caller assumes ownership of the
- * descriptor, and it will not be closed when the AsyncSocket is destroyed.
- */
- virtual int detachFd();
-
- /**
- * Uniquely identifies a handle to a socket option value. Each
- * combination of level and option name corresponds to one socket
- * option value.
- */
- class OptionKey {
- public:
- bool operator<(const OptionKey& other) const {
- if (level == other.level) {
- return optname < other.optname;
- }
- return level < other.level;
- }
- int apply(int fd, int val) const {
- return setsockopt(fd, level, optname, &val, sizeof(val));
- }
- int level;
- int optname;
- };
-
- // Maps from a socket option key to its value
- typedef std::map<OptionKey, int> OptionMap;
-
- static const OptionMap emptyOptionMap;
- static const folly::SocketAddress anyAddress;
-
- /**
- * Initiate a connection.
- *
- * @param callback The callback to inform when the connection attempt
- * completes.
- * @param address The address to connect to.
- * @param timeout A timeout value, in milliseconds. If the connection
- * does not succeed within this period,
- * callback->connectError() will be invoked.
- */
- virtual void connect(ConnectCallback* callback,
- const folly::SocketAddress& address,
- int timeout = 0,
- const OptionMap &options = emptyOptionMap,
- const folly::SocketAddress& bindAddr = anyAddress
- ) noexcept;
- void connect(ConnectCallback* callback, const std::string& ip, uint16_t port,
- int timeout = 00,
- const OptionMap &options = emptyOptionMap) noexcept;
-
- /**
- * Set the send timeout.
- *
- * If write requests do not make any progress for more than the specified
- * number of milliseconds, fail all pending writes and close the socket.
- *
- * If write requests are currently pending when setSendTimeout() is called,
- * the timeout interval is immediately restarted using the new value.
- *
- * (See the comments for AsyncSocket for an explanation of why AsyncSocket
- * provides setSendTimeout() but not setRecvTimeout().)
- *
- * @param milliseconds The timeout duration, in milliseconds. If 0, no
- * timeout will be used.
- */
- void setSendTimeout(uint32_t milliseconds) override;
-
- /**
- * Get the send timeout.
- *
- * @return Returns the current send timeout, in milliseconds. A return value
- * of 0 indicates that no timeout is set.
- */
- uint32_t getSendTimeout() const override {
- return sendTimeout_;
- }
-
- /**
- * Set the maximum number of reads to execute from the underlying
- * socket each time the EventBase detects that new ingress data is
- * available. The default is unlimited, but callers can use this method
- * to limit the amount of data read from the socket per event loop
- * iteration.
- *
- * @param maxReads Maximum number of reads per data-available event;
- * a value of zero means unlimited.
- */
- void setMaxReadsPerEvent(uint16_t maxReads) {
- maxReadsPerEvent_ = maxReads;
- }
-
- /**
- * Get the maximum number of reads this object will execute from
- * the underlying socket each time the EventBase detects that new
- * ingress data is available.
- *
- * @returns Maximum number of reads per data-available event; a value
- * of zero means unlimited.
- */
- uint16_t getMaxReadsPerEvent() const {
- return maxReadsPerEvent_;
- }
-
- // Read and write methods
- void setReadCB(ReadCallback* callback);
- ReadCallback* getReadCallback() const;
-
- void write(WriteCallback* callback, const void* buf, size_t bytes,
- WriteFlags flags = WriteFlags::NONE);
- void writev(WriteCallback* callback, const iovec* vec, size_t count,
- WriteFlags flags = WriteFlags::NONE);
- void writeChain(WriteCallback* callback,
- std::unique_ptr<folly::IOBuf>&& buf,
- WriteFlags flags = WriteFlags::NONE);
-
- // Methods inherited from AsyncTransport
- void close() override;
- void closeNow() override;
- void closeWithReset() override;
- void shutdownWrite() override;
- void shutdownWriteNow() override;
-
- bool readable() const override;
- bool isPending() const override;
- virtual bool hangup() const;
- bool good() const override;
- bool error() const override;
- void attachEventBase(EventBase* eventBase) override;
- void detachEventBase() override;
- bool isDetachable() const override;
-
- void getLocalAddress(
- folly::SocketAddress* address) const override;
- void getPeerAddress(
- folly::SocketAddress* address) const override;
-
- bool isEorTrackingEnabled() const override { return false; }
-
- void setEorTracking(bool track) override {}
-
- bool connecting() const override {
- return (state_ == StateEnum::CONNECTING);
- }
-
- size_t getAppBytesWritten() const override {
- return appBytesWritten_;
- }
-
- size_t getRawBytesWritten() const override {
- return getAppBytesWritten();
- }
-
- size_t getAppBytesReceived() const override {
- return appBytesReceived_;
- }
-
- size_t getRawBytesReceived() const override {
- return getAppBytesReceived();
- }
-
- // Methods controlling socket options
-
- /**
- * Force writes to be transmitted immediately.
- *
- * This controls the TCP_NODELAY socket option. When enabled, TCP segments
- * are sent as soon as possible, even if it is not a full frame of data.
- * When disabled, the data may be buffered briefly to try and wait for a full
- * frame of data.
- *
- * By default, TCP_NODELAY is enabled for AsyncSocket objects.
- *
- * This method will fail if the socket is not currently open.
- *
- * @return Returns 0 if the TCP_NODELAY flag was successfully updated,
- * or a non-zero errno value on error.
- */
- int setNoDelay(bool noDelay);
-
- /*
- * Set the Flavor of Congestion Control to be used for this Socket
- * Please check '/lib/modules/<kernel>/kernel/net/ipv4' for tcp_*.ko
- * first to make sure the module is available for plugging in
- * Alternatively you can choose from net.ipv4.tcp_allowed_congestion_control
- */
- int setCongestionFlavor(const std::string &cname);
-
- /*
- * Forces ACKs to be sent immediately
- *
- * @return Returns 0 if the TCP_QUICKACK flag was successfully updated,
- * or a non-zero errno value on error.
- */
- int setQuickAck(bool quickack);
-
- /**
- * Set the send bufsize
- */
- int setSendBufSize(size_t bufsize);
-
- /**
- * Set the recv bufsize
- */
- int setRecvBufSize(size_t bufsize);
-
- /**
- * Sets a specific tcp personality
- * Available only on kernels 3.2 and greater
- */
- #define SO_SET_NAMESPACE 41
- int setTCPProfile(int profd);
-
-
- /**
- * Generic API for reading a socket option.
- *
- * @param level same as the "level" parameter in getsockopt().
- * @param optname same as the "optname" parameter in getsockopt().
- * @param optval pointer to the variable in which the option value should
- * be returned.
- * @return same as the return value of getsockopt().
- */
- template <typename T>
- int getSockOpt(int level, int optname, T *optval) {
- return getsockopt(fd_, level, optname, optval, sizeof(T));
- }
-
- /**
- * Generic API for setting a socket option.
- *
- * @param level same as the "level" parameter in getsockopt().
- * @param optname same as the "optname" parameter in getsockopt().
- * @param optval the option value to set.
- * @return same as the return value of setsockopt().
- */
- template <typename T>
- int setSockOpt(int level, int optname, const T *optval) {
- return setsockopt(fd_, level, optname, optval, sizeof(T));
- }
-
- protected:
- enum ReadResultEnum {
- READ_EOF = 0,
- READ_ERROR = -1,
- READ_BLOCKING = -2,
- };
-
- /**
- * Protected destructor.
- *
- * Users of AsyncSocket must never delete it directly. Instead, invoke
- * destroy() instead. (See the documentation in DelayedDestruction.h for
- * more details.)
- */
- ~AsyncSocket();
-
- enum class StateEnum : uint8_t {
- UNINIT,
- CONNECTING,
- ESTABLISHED,
- CLOSED,
- ERROR
- };
-
- friend std::ostream& operator << (std::ostream& os, const StateEnum& state);
-
- enum ShutdownFlags {
- /// shutdownWrite() called, but we are still waiting on writes to drain
- SHUT_WRITE_PENDING = 0x01,
- /// writes have been completely shut down
- SHUT_WRITE = 0x02,
- /**
- * Reads have been shutdown.
- *
- * At the moment we don't distinguish between remote read shutdown
- * (received EOF from the remote end) and local read shutdown. We can
- * only receive EOF when a read callback is set, and we immediately inform
- * it of the EOF. Therefore there doesn't seem to be any reason to have a
- * separate state of "received EOF but the local side may still want to
- * read".
- *
- * We also don't currently provide any API for only shutting down the read
- * side of a socket. (This is a no-op as far as TCP is concerned, anyway.)
- */
- SHUT_READ = 0x04,
- };
-
- class WriteRequest;
-
- class WriteTimeout : public AsyncTimeout {
- public:
- WriteTimeout(AsyncSocket* socket, EventBase* eventBase)
- : AsyncTimeout(eventBase)
- , socket_(socket) {}
-
- virtual void timeoutExpired() noexcept {
- socket_->timeoutExpired();
- }
-
- private:
- AsyncSocket* socket_;
- };
-
- class IoHandler : public EventHandler {
- public:
- IoHandler(AsyncSocket* socket, EventBase* eventBase)
- : EventHandler(eventBase, -1)
- , socket_(socket) {}
- IoHandler(AsyncSocket* socket, EventBase* eventBase, int fd)
- : EventHandler(eventBase, fd)
- , socket_(socket) {}
-
- virtual void handlerReady(uint16_t events) noexcept {
- socket_->ioReady(events);
- }
-
- private:
- AsyncSocket* socket_;
- };
-
- void init();
-
- // event notification methods
- void ioReady(uint16_t events) noexcept;
- virtual void checkForImmediateRead() noexcept;
- virtual void handleInitialReadWrite() noexcept;
- virtual void handleRead() noexcept;
- virtual void handleWrite() noexcept;
- virtual void handleConnect() noexcept;
- void timeoutExpired() noexcept;
-
- /**
- * Attempt to read from the socket.
- *
- * @param buf The buffer to read data into.
- * @param buflen The length of the buffer.
- *
- * @return Returns the number of bytes read, or READ_EOF on EOF, or
- * READ_ERROR on error, or READ_BLOCKING if the operation will
- * block.
- */
- virtual ssize_t performRead(void* buf, size_t buflen);
-
- /**
- * Populate an iovec array from an IOBuf and attempt to write it.
- *
- * @param callback Write completion/error callback.
- * @param vec Target iovec array; caller retains ownership.
- * @param count Number of IOBufs to write, beginning at start of buf.
- * @param buf Chain of iovecs.
- * @param flags set of flags for the underlying write calls, like cork
- */
- void writeChainImpl(WriteCallback* callback, iovec* vec,
- size_t count, std::unique_ptr<folly::IOBuf>&& buf,
- WriteFlags flags);
-
- /**
- * Write as much data as possible to the socket without blocking,
- * and queue up any leftover data to send when the socket can
- * handle writes again.
- *
- * @param callback The callback to invoke when the write is completed.
- * @param vec Array of buffers to write; this method will make a
- * copy of the vector (but not the buffers themselves)
- * if the write has to be completed asynchronously.
- * @param count Number of elements in vec.
- * @param buf The IOBuf that manages the buffers referenced by
- * vec, or a pointer to nullptr if the buffers are not
- * associated with an IOBuf. Note that ownership of
- * the IOBuf is transferred here; upon completion of
- * the write, the AsyncSocket deletes the IOBuf.
- * @param flags Set of write flags.
- */
- void writeImpl(WriteCallback* callback, const iovec* vec, size_t count,
- std::unique_ptr<folly::IOBuf>&& buf,
- WriteFlags flags = WriteFlags::NONE);
-
- /**
- * Attempt to write to the socket.
- *
- * @param vec The iovec array pointing to the buffers to write.
- * @param count The length of the iovec array.
- * @param flags Set of write flags.
- * @param countWritten On return, the value pointed to by this parameter
- * will contain the number of iovec entries that were
- * fully written.
- * @param partialWritten On return, the value pointed to by this parameter
- * will contain the number of bytes written in the
- * partially written iovec entry.
- *
- * @return Returns the total number of bytes written, or -1 on error. If no
- * data can be written immediately, 0 is returned.
- */
- virtual ssize_t performWrite(const iovec* vec, uint32_t count,
- WriteFlags flags, uint32_t* countWritten,
- uint32_t* partialWritten);
-
- bool updateEventRegistration();
-
- /**
- * Update event registration.
- *
- * @param enable Flags of events to enable. Set it to 0 if no events
- * need to be enabled in this call.
- * @param disable Flags of events
- * to disable. Set it to 0 if no events need to be disabled in this
- * call.
- *
- * @return true iff the update is successful.
- */
- bool updateEventRegistration(uint16_t enable, uint16_t disable);
-
- // Actually close the file descriptor and set it to -1 so we don't
- // accidentally close it again.
- void doClose();
-
- // error handling methods
- void startFail();
- void finishFail();
- void fail(const char* fn, const AsyncSocketException& ex);
- void failConnect(const char* fn, const AsyncSocketException& ex);
- void failRead(const char* fn, const AsyncSocketException& ex);
- void failWrite(const char* fn, WriteCallback* callback, size_t bytesWritten,
- const AsyncSocketException& ex);
- void failWrite(const char* fn, const AsyncSocketException& ex);
- void failAllWrites(const AsyncSocketException& ex);
- void invalidState(ConnectCallback* callback);
- void invalidState(ReadCallback* callback);
- void invalidState(WriteCallback* callback);
-
- std::string withAddr(const std::string& s);
-
- StateEnum state_; ///< StateEnum describing current state
- uint8_t shutdownFlags_; ///< Shutdown state (ShutdownFlags)
- uint16_t eventFlags_; ///< EventBase::HandlerFlags settings
- int fd_; ///< The socket file descriptor
- mutable
- folly::SocketAddress addr_; ///< The address we tried to connect to
- uint32_t sendTimeout_; ///< The send timeout, in milliseconds
- uint16_t maxReadsPerEvent_; ///< Max reads per event loop iteration
- EventBase* eventBase_; ///< The EventBase
- WriteTimeout writeTimeout_; ///< A timeout for connect and write
- IoHandler ioHandler_; ///< A EventHandler to monitor the fd
-
- ConnectCallback* connectCallback_; ///< ConnectCallback
- ReadCallback* readCallback_; ///< ReadCallback
- WriteRequest* writeReqHead_; ///< Chain of WriteRequests
- WriteRequest* writeReqTail_; ///< End of WriteRequest chain
- ShutdownSocketSet* shutdownSocketSet_;
- size_t appBytesReceived_; ///< Num of bytes received from socket
- size_t appBytesWritten_; ///< Num of bytes written to socket
-};
-
-
-} // folly