Move AsyncSocket to folly (try 2)
authorDave Watson <davejwatson@fb.com>
Tue, 28 Oct 2014 17:52:49 +0000 (10:52 -0700)
committerdcsommer <dcsommer@fb.com>
Wed, 29 Oct 2014 23:07:42 +0000 (16:07 -0700)
Summary:

Test Plan:

Reviewed By: dcsommer@fb.com

Subscribers: trunkagent, mcduff, hitesh, doug, alandau, bmatheny, njormrod, mshneer, folly-diffs@

FB internal diff: D1644071

Signature: t1:1644071:1414526899:c41dd55e2957a7e1fcc54508e20cdb4a9c8c30d4

folly/Makefile.am
folly/io/async/AsyncSocket.cpp [new file with mode: 0644]
folly/io/async/AsyncSocket.h [new file with mode: 0644]
folly/io/async/AsyncSocketException.h [new file with mode: 0644]
folly/io/async/AsyncTransport.h [new file with mode: 0644]

index 32752d2009e52852d6fbc65bf35edca5b0828d0b..76ad6a30ea02aad1809f2cd16b402b2c1cb1f065 100644 (file)
@@ -132,6 +132,8 @@ nobase_follyinclude_HEADERS = \
        io/async/AsyncTimeout.h \
        io/async/AsyncTransport.h \
        io/async/AsyncServerSocket.h \
+       io/async/AsyncSocket.h \
+       io/async/AsyncSocketException.h \
        io/async/DelayedDestruction.h \
        io/async/EventBase.h \
        io/async/EventBaseManager.h \
@@ -262,6 +264,7 @@ libfolly_la_SOURCES = \
        io/ShutdownSocketSet.cpp \
        io/async/AsyncTimeout.cpp \
        io/async/AsyncServerSocket.cpp \
+       io/async/AsyncSocket.cpp \
        io/async/EventBase.cpp \
        io/async/EventBaseManager.cpp \
        io/async/EventHandler.cpp \
diff --git a/folly/io/async/AsyncSocket.cpp b/folly/io/async/AsyncSocket.cpp
new file mode 100644 (file)
index 0000000..69cfe86
--- /dev/null
@@ -0,0 +1,1967 @@
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <folly/io/async/AsyncSocket.h>
+
+#include <folly/io/async/EventBase.h>
+#include <folly/SocketAddress.h>
+#include <folly/io/IOBuf.h>
+
+#include <poll.h>
+#include <errno.h>
+#include <limits.h>
+#include <unistd.h>
+#include <fcntl.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <netinet/tcp.h>
+
+using std::string;
+using std::unique_ptr;
+
+namespace folly {
+
+// static members initializers
+const AsyncSocket::OptionMap AsyncSocket::emptyOptionMap;
+const folly::SocketAddress AsyncSocket::anyAddress =
+  folly::SocketAddress("0.0.0.0", 0);
+
+const AsyncSocketException socketClosedLocallyEx(
+    AsyncSocketException::END_OF_FILE, "socket closed locally");
+const AsyncSocketException socketShutdownForWritesEx(
+    AsyncSocketException::END_OF_FILE, "socket shutdown for writes");
+
+// TODO: It might help performance to provide a version of WriteRequest that
+// users could derive from, so we can avoid the extra allocation for each call
+// to write()/writev().  We could templatize TFramedAsyncChannel just like the
+// protocols are currently templatized for transports.
+//
+// We would need the version for external users where they provide the iovec
+// storage space, and only our internal version would allocate it at the end of
+// the WriteRequest.
+
+/**
+ * A WriteRequest object tracks information about a pending write() or writev()
+ * operation.
+ *
+ * A new WriteRequest operation is allocated on the heap for all write
+ * operations that cannot be completed immediately.
+ */
+class AsyncSocket::WriteRequest {
+ public:
+  static WriteRequest* newRequest(WriteCallback* callback,
+                                  const iovec* ops,
+                                  uint32_t opCount,
+                                  unique_ptr<IOBuf>&& ioBuf,
+                                  WriteFlags flags) {
+    assert(opCount > 0);
+    // Since we put a variable size iovec array at the end
+    // of each WriteRequest, we have to manually allocate the memory.
+    void* buf = malloc(sizeof(WriteRequest) +
+                       (opCount * sizeof(struct iovec)));
+    if (buf == nullptr) {
+      throw std::bad_alloc();
+    }
+
+    return new(buf) WriteRequest(callback, ops, opCount, std::move(ioBuf),
+                                 flags);
+  }
+
+  void destroy() {
+    this->~WriteRequest();
+    free(this);
+  }
+
+  bool cork() const {
+    return isSet(flags_, WriteFlags::CORK);
+  }
+
+  WriteFlags flags() const {
+    return flags_;
+  }
+
+  WriteRequest* getNext() const {
+    return next_;
+  }
+
+  WriteCallback* getCallback() const {
+    return callback_;
+  }
+
+  uint32_t getBytesWritten() const {
+    return bytesWritten_;
+  }
+
+  const struct iovec* getOps() const {
+    assert(opCount_ > opIndex_);
+    return writeOps_ + opIndex_;
+  }
+
+  uint32_t getOpCount() const {
+    assert(opCount_ > opIndex_);
+    return opCount_ - opIndex_;
+  }
+
+  void consume(uint32_t wholeOps, uint32_t partialBytes,
+               uint32_t totalBytesWritten) {
+    // Advance opIndex_ forward by wholeOps
+    opIndex_ += wholeOps;
+    assert(opIndex_ < opCount_);
+
+    // If we've finished writing any IOBufs, release them
+    if (ioBuf_) {
+      for (uint32_t i = wholeOps; i != 0; --i) {
+        assert(ioBuf_);
+        ioBuf_ = ioBuf_->pop();
+      }
+    }
+
+    // Move partialBytes forward into the current iovec buffer
+    struct iovec* currentOp = writeOps_ + opIndex_;
+    assert((partialBytes < currentOp->iov_len) || (currentOp->iov_len == 0));
+    currentOp->iov_base =
+      reinterpret_cast<uint8_t*>(currentOp->iov_base) + partialBytes;
+    currentOp->iov_len -= partialBytes;
+
+    // Increment the bytesWritten_ count by totalBytesWritten
+    bytesWritten_ += totalBytesWritten;
+  }
+
+  void append(WriteRequest* next) {
+    assert(next_ == nullptr);
+    next_ = next;
+  }
+
+ private:
+  WriteRequest(WriteCallback* callback,
+               const struct iovec* ops,
+               uint32_t opCount,
+               unique_ptr<IOBuf>&& ioBuf,
+               WriteFlags flags)
+    : next_(nullptr)
+    , callback_(callback)
+    , bytesWritten_(0)
+    , opCount_(opCount)
+    , opIndex_(0)
+    , flags_(flags)
+    , ioBuf_(std::move(ioBuf)) {
+    memcpy(writeOps_, ops, sizeof(*ops) * opCount_);
+  }
+
+  // Private destructor, to ensure callers use destroy()
+  ~WriteRequest() {}
+
+  WriteRequest* next_;          ///< pointer to next WriteRequest
+  WriteCallback* callback_;     ///< completion callback
+  uint32_t bytesWritten_;       ///< bytes written
+  uint32_t opCount_;            ///< number of entries in writeOps_
+  uint32_t opIndex_;            ///< current index into writeOps_
+  WriteFlags flags_;            ///< set for WriteFlags
+  unique_ptr<IOBuf> ioBuf_;     ///< underlying IOBuf, or nullptr if N/A
+  struct iovec writeOps_[];     ///< write operation(s) list
+};
+
+AsyncSocket::AsyncSocket(EventBase* evb)
+  : eventBase_(evb)
+  , writeTimeout_(this, evb)
+  , ioHandler_(this, evb) {
+  VLOG(5) << "new AsyncSocket(" << this << ", evb=" << evb << ")";
+  init();
+}
+
+AsyncSocket::AsyncSocket(EventBase* evb,
+                           const folly::SocketAddress& address,
+                           uint32_t connectTimeout)
+  : eventBase_(evb)
+  , writeTimeout_(this, evb)
+  , ioHandler_(this, evb) {
+  VLOG(5) << "new AsyncSocket(" << this << ", evb=" << evb << ")";
+  init();
+  connect(nullptr, address, connectTimeout);
+}
+
+AsyncSocket::AsyncSocket(EventBase* evb,
+                           const std::string& ip,
+                           uint16_t port,
+                           uint32_t connectTimeout)
+  : eventBase_(evb)
+  , writeTimeout_(this, evb)
+  , ioHandler_(this, evb) {
+  VLOG(5) << "new AsyncSocket(" << this << ", evb=" << evb << ")";
+  init();
+  connect(nullptr, ip, port, connectTimeout);
+}
+
+AsyncSocket::AsyncSocket(EventBase* evb, int fd)
+  : eventBase_(evb)
+  , writeTimeout_(this, evb)
+  , ioHandler_(this, evb, fd) {
+  VLOG(5) << "new AsyncSocket(" << this << ", evb=" << evb << ", fd="
+          << fd << ")";
+  init();
+  fd_ = fd;
+  state_ = StateEnum::ESTABLISHED;
+}
+
+// init() method, since constructor forwarding isn't supported in most
+// compilers yet.
+void AsyncSocket::init() {
+  assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
+  shutdownFlags_ = 0;
+  state_ = StateEnum::UNINIT;
+  eventFlags_ = EventHandler::NONE;
+  fd_ = -1;
+  sendTimeout_ = 0;
+  maxReadsPerEvent_ = 0;
+  connectCallback_ = nullptr;
+  readCallback_ = nullptr;
+  writeReqHead_ = nullptr;
+  writeReqTail_ = nullptr;
+  shutdownSocketSet_ = nullptr;
+  appBytesWritten_ = 0;
+  appBytesReceived_ = 0;
+}
+
+AsyncSocket::~AsyncSocket() {
+  VLOG(7) << "actual destruction of AsyncSocket(this=" << this
+          << ", evb=" << eventBase_ << ", fd=" << fd_
+          << ", state=" << state_ << ")";
+}
+
+void AsyncSocket::destroy() {
+  VLOG(5) << "AsyncSocket::destroy(this=" << this << ", evb=" << eventBase_
+          << ", fd=" << fd_ << ", state=" << state_;
+  // When destroy is called, close the socket immediately
+  closeNow();
+
+  // Then call DelayedDestruction::destroy() to take care of
+  // whether or not we need immediate or delayed destruction
+  DelayedDestruction::destroy();
+}
+
+int AsyncSocket::detachFd() {
+  VLOG(6) << "AsyncSocket::detachFd(this=" << this << ", fd=" << fd_
+          << ", evb=" << eventBase_ << ", state=" << state_
+          << ", events=" << std::hex << eventFlags_ << ")";
+  // Extract the fd, and set fd_ to -1 first, so closeNow() won't
+  // actually close the descriptor.
+  if (shutdownSocketSet_) {
+    shutdownSocketSet_->remove(fd_);
+  }
+  int fd = fd_;
+  fd_ = -1;
+  // Call closeNow() to invoke all pending callbacks with an error.
+  closeNow();
+  // Update the EventHandler to stop using this fd.
+  // This can only be done after closeNow() unregisters the handler.
+  ioHandler_.changeHandlerFD(-1);
+  return fd;
+}
+
+void AsyncSocket::setShutdownSocketSet(ShutdownSocketSet* newSS) {
+  if (shutdownSocketSet_ == newSS) {
+    return;
+  }
+  if (shutdownSocketSet_ && fd_ != -1) {
+    shutdownSocketSet_->remove(fd_);
+  }
+  shutdownSocketSet_ = newSS;
+  if (shutdownSocketSet_ && fd_ != -1) {
+    shutdownSocketSet_->add(fd_);
+  }
+}
+
+void AsyncSocket::connect(ConnectCallback* callback,
+                           const folly::SocketAddress& address,
+                           int timeout,
+                           const OptionMap &options,
+                           const folly::SocketAddress& bindAddr) noexcept {
+  DestructorGuard dg(this);
+  assert(eventBase_->isInEventBaseThread());
+
+  addr_ = address;
+
+  // Make sure we're in the uninitialized state
+  if (state_ != StateEnum::UNINIT) {
+    return invalidState(callback);
+  }
+
+  assert(fd_ == -1);
+  state_ = StateEnum::CONNECTING;
+  connectCallback_ = callback;
+
+  sockaddr_storage addrStorage;
+  sockaddr* saddr = reinterpret_cast<sockaddr*>(&addrStorage);
+
+  try {
+    // Create the socket
+    // Technically the first parameter should actually be a protocol family
+    // constant (PF_xxx) rather than an address family (AF_xxx), but the
+    // distinction is mainly just historical.  In pretty much all
+    // implementations the PF_foo and AF_foo constants are identical.
+    fd_ = socket(address.getFamily(), SOCK_STREAM, 0);
+    if (fd_ < 0) {
+      throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
+                                withAddr("failed to create socket"), errno);
+    }
+    if (shutdownSocketSet_) {
+      shutdownSocketSet_->add(fd_);
+    }
+    ioHandler_.changeHandlerFD(fd_);
+
+    // Set the FD_CLOEXEC flag so that the socket will be closed if the program
+    // later forks and execs.
+    int rv = fcntl(fd_, F_SETFD, FD_CLOEXEC);
+    if (rv != 0) {
+      throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
+                                withAddr("failed to set close-on-exec flag"),
+                                errno);
+    }
+
+    // Put the socket in non-blocking mode
+    int flags = fcntl(fd_, F_GETFL, 0);
+    if (flags == -1) {
+      throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
+                                withAddr("failed to get socket flags"), errno);
+    }
+    rv = fcntl(fd_, F_SETFL, flags | O_NONBLOCK);
+    if (rv == -1) {
+      throw AsyncSocketException(
+          AsyncSocketException::INTERNAL_ERROR,
+          withAddr("failed to put socket in non-blocking mode"),
+          errno);
+    }
+
+#if !defined(MSG_NOSIGNAL) && defined(F_SETNOSIGPIPE)
+    // iOS and OS X don't support MSG_NOSIGNAL; set F_SETNOSIGPIPE instead
+    rv = fcntl(fd_, F_SETNOSIGPIPE, 1);
+    if (rv == -1) {
+      throw AsyncSocketException(
+          AsyncSocketException::INTERNAL_ERROR,
+          "failed to enable F_SETNOSIGPIPE on socket",
+          errno);
+    }
+#endif
+
+    // By default, turn on TCP_NODELAY
+    // If setNoDelay() fails, we continue anyway; this isn't a fatal error.
+    // setNoDelay() will log an error message if it fails.
+    if (address.getFamily() != AF_UNIX) {
+      (void)setNoDelay(true);
+    }
+
+    VLOG(5) << "AsyncSocket::connect(this=" << this << ", evb=" << eventBase_
+            << ", fd=" << fd_ << ", host=" << address.describe().c_str();
+
+    // bind the socket
+    if (bindAddr != anyAddress) {
+      int one = 1;
+      if (::setsockopt(fd_, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one))) {
+        doClose();
+        throw AsyncSocketException(
+          AsyncSocketException::NOT_OPEN,
+          "failed to setsockopt prior to bind on " + bindAddr.describe(),
+          errno);
+      }
+
+      bindAddr.getAddress(&addrStorage);
+
+      if (::bind(fd_, saddr, bindAddr.getActualSize()) != 0) {
+        doClose();
+        throw AsyncSocketException(AsyncSocketException::NOT_OPEN,
+                                  "failed to bind to async socket: " +
+                                  bindAddr.describe(),
+                                  errno);
+      }
+    }
+
+    // Apply the additional options if any.
+    for (const auto& opt: options) {
+      int rv = opt.first.apply(fd_, opt.second);
+      if (rv != 0) {
+        throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
+                                  withAddr("failed to set socket option"),
+                                  errno);
+      }
+    }
+
+    // Perform the connect()
+    address.getAddress(&addrStorage);
+
+    rv = ::connect(fd_, saddr, address.getActualSize());
+    if (rv < 0) {
+      if (errno == EINPROGRESS) {
+        // Connection in progress.
+        if (timeout > 0) {
+          // Start a timer in case the connection takes too long.
+          if (!writeTimeout_.scheduleTimeout(timeout)) {
+            throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
+                withAddr("failed to schedule AsyncSocket connect timeout"));
+          }
+        }
+
+        // Register for write events, so we'll
+        // be notified when the connection finishes/fails.
+        // Note that we don't register for a persistent event here.
+        assert(eventFlags_ == EventHandler::NONE);
+        eventFlags_ = EventHandler::WRITE;
+        if (!ioHandler_.registerHandler(eventFlags_)) {
+          throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
+              withAddr("failed to register AsyncSocket connect handler"));
+        }
+        return;
+      } else {
+        throw AsyncSocketException(AsyncSocketException::NOT_OPEN,
+                                  "connect failed (immediately)", errno);
+      }
+    }
+
+    // If we're still here the connect() succeeded immediately.
+    // Fall through to call the callback outside of this try...catch block
+  } catch (const AsyncSocketException& ex) {
+    return failConnect(__func__, ex);
+  } catch (const std::exception& ex) {
+    // shouldn't happen, but handle it just in case
+    VLOG(4) << "AsyncSocket::connect(this=" << this << ", fd=" << fd_
+               << "): unexpected " << typeid(ex).name() << " exception: "
+               << ex.what();
+    AsyncSocketException tex(AsyncSocketException::INTERNAL_ERROR,
+                            withAddr(string("unexpected exception: ") +
+                                     ex.what()));
+    return failConnect(__func__, tex);
+  }
+
+  // The connection succeeded immediately
+  // The read callback may not have been set yet, and no writes may be pending
+  // yet, so we don't have to register for any events at the moment.
+  VLOG(8) << "AsyncSocket::connect succeeded immediately; this=" << this;
+  assert(readCallback_ == nullptr);
+  assert(writeReqHead_ == nullptr);
+  state_ = StateEnum::ESTABLISHED;
+  if (callback) {
+    connectCallback_ = nullptr;
+    callback->connectSuccess();
+  }
+}
+
+void AsyncSocket::connect(ConnectCallback* callback,
+                           const string& ip, uint16_t port,
+                           int timeout,
+                           const OptionMap &options) noexcept {
+  DestructorGuard dg(this);
+  try {
+    connectCallback_ = callback;
+    connect(callback, folly::SocketAddress(ip, port), timeout, options);
+  } catch (const std::exception& ex) {
+    AsyncSocketException tex(AsyncSocketException::INTERNAL_ERROR,
+                            ex.what());
+    return failConnect(__func__, tex);
+  }
+}
+
+void AsyncSocket::setSendTimeout(uint32_t milliseconds) {
+  sendTimeout_ = milliseconds;
+  assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
+
+  // If we are currently pending on write requests, immediately update
+  // writeTimeout_ with the new value.
+  if ((eventFlags_ & EventHandler::WRITE) &&
+      (state_ != StateEnum::CONNECTING)) {
+    assert(state_ == StateEnum::ESTABLISHED);
+    assert((shutdownFlags_ & SHUT_WRITE) == 0);
+    if (sendTimeout_ > 0) {
+      if (!writeTimeout_.scheduleTimeout(sendTimeout_)) {
+        AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
+            withAddr("failed to reschedule send timeout in setSendTimeout"));
+        return failWrite(__func__, ex);
+      }
+    } else {
+      writeTimeout_.cancelTimeout();
+    }
+  }
+}
+
+void AsyncSocket::setReadCB(ReadCallback *callback) {
+  VLOG(6) << "AsyncSocket::setReadCallback() this=" << this << ", fd=" << fd_
+          << ", callback=" << callback << ", state=" << state_;
+
+  // Short circuit if callback is the same as the existing readCallback_.
+  //
+  // Note that this is needed for proper functioning during some cleanup cases.
+  // During cleanup we allow setReadCallback(nullptr) to be called even if the
+  // read callback is already unset and we have been detached from an event
+  // base.  This check prevents us from asserting
+  // eventBase_->isInEventBaseThread() when eventBase_ is nullptr.
+  if (callback == readCallback_) {
+    return;
+  }
+
+  if (shutdownFlags_ & SHUT_READ) {
+    // Reads have already been shut down on this socket.
+    //
+    // Allow setReadCallback(nullptr) to be called in this case, but don't
+    // allow a new callback to be set.
+    //
+    // For example, setReadCallback(nullptr) can happen after an error if we
+    // invoke some other error callback before invoking readError().  The other
+    // error callback that is invoked first may go ahead and clear the read
+    // callback before we get a chance to invoke readError().
+    if (callback != nullptr) {
+      return invalidState(callback);
+    }
+    assert((eventFlags_ & EventHandler::READ) == 0);
+    readCallback_ = nullptr;
+    return;
+  }
+
+  DestructorGuard dg(this);
+  assert(eventBase_->isInEventBaseThread());
+
+  switch ((StateEnum)state_) {
+    case StateEnum::CONNECTING:
+      // For convenience, we allow the read callback to be set while we are
+      // still connecting.  We just store the callback for now.  Once the
+      // connection completes we'll register for read events.
+      readCallback_ = callback;
+      return;
+    case StateEnum::ESTABLISHED:
+    {
+      readCallback_ = callback;
+      uint16_t oldFlags = eventFlags_;
+      if (readCallback_) {
+        eventFlags_ |= EventHandler::READ;
+      } else {
+        eventFlags_ &= ~EventHandler::READ;
+      }
+
+      // Update our registration if our flags have changed
+      if (eventFlags_ != oldFlags) {
+        // We intentionally ignore the return value here.
+        // updateEventRegistration() will move us into the error state if it
+        // fails, and we don't need to do anything else here afterwards.
+        (void)updateEventRegistration();
+      }
+
+      if (readCallback_) {
+        checkForImmediateRead();
+      }
+      return;
+    }
+    case StateEnum::CLOSED:
+    case StateEnum::ERROR:
+      // We should never reach here.  SHUT_READ should always be set
+      // if we are in STATE_CLOSED or STATE_ERROR.
+      assert(false);
+      return invalidState(callback);
+    case StateEnum::UNINIT:
+      // We do not allow setReadCallback() to be called before we start
+      // connecting.
+      return invalidState(callback);
+  }
+
+  // We don't put a default case in the switch statement, so that the compiler
+  // will warn us to update the switch statement if a new state is added.
+  return invalidState(callback);
+}
+
+AsyncSocket::ReadCallback* AsyncSocket::getReadCallback() const {
+  return readCallback_;
+}
+
+void AsyncSocket::write(WriteCallback* callback,
+                         const void* buf, size_t bytes, WriteFlags flags) {
+  iovec op;
+  op.iov_base = const_cast<void*>(buf);
+  op.iov_len = bytes;
+  writeImpl(callback, &op, 1, std::move(unique_ptr<IOBuf>()), flags);
+}
+
+void AsyncSocket::writev(WriteCallback* callback,
+                          const iovec* vec,
+                          size_t count,
+                          WriteFlags flags) {
+  writeImpl(callback, vec, count, std::move(unique_ptr<IOBuf>()), flags);
+}
+
+void AsyncSocket::writeChain(WriteCallback* callback, unique_ptr<IOBuf>&& buf,
+                              WriteFlags flags) {
+  size_t count = buf->countChainElements();
+  if (count <= 64) {
+    iovec vec[count];
+    writeChainImpl(callback, vec, count, std::move(buf), flags);
+  } else {
+    iovec* vec = new iovec[count];
+    writeChainImpl(callback, vec, count, std::move(buf), flags);
+    delete[] vec;
+  }
+}
+
+void AsyncSocket::writeChainImpl(WriteCallback* callback, iovec* vec,
+    size_t count, unique_ptr<IOBuf>&& buf, WriteFlags flags) {
+  const IOBuf* head = buf.get();
+  const IOBuf* next = head;
+  unsigned i = 0;
+  do {
+    vec[i].iov_base = const_cast<uint8_t *>(next->data());
+    vec[i].iov_len = next->length();
+    // IOBuf can get confused by empty iovec buffers, so increment the
+    // output pointer only if the iovec buffer is non-empty.  We could
+    // end the loop with i < count, but that's ok.
+    if (vec[i].iov_len != 0) {
+      i++;
+    }
+    next = next->next();
+  } while (next != head);
+  writeImpl(callback, vec, i, std::move(buf), flags);
+}
+
+void AsyncSocket::writeImpl(WriteCallback* callback, const iovec* vec,
+                             size_t count, unique_ptr<IOBuf>&& buf,
+                             WriteFlags flags) {
+  VLOG(6) << "AsyncSocket::writev() this=" << this << ", fd=" << fd_
+          << ", callback=" << callback << ", count=" << count
+          << ", state=" << state_;
+  DestructorGuard dg(this);
+  unique_ptr<IOBuf>ioBuf(std::move(buf));
+  assert(eventBase_->isInEventBaseThread());
+
+  if (shutdownFlags_ & (SHUT_WRITE | SHUT_WRITE_PENDING)) {
+    // No new writes may be performed after the write side of the socket has
+    // been shutdown.
+    //
+    // We could just call callback->writeError() here to fail just this write.
+    // However, fail hard and use invalidState() to fail all outstanding
+    // callbacks and move the socket into the error state.  There's most likely
+    // a bug in the caller's code, so we abort everything rather than trying to
+    // proceed as best we can.
+    return invalidState(callback);
+  }
+
+  uint32_t countWritten = 0;
+  uint32_t partialWritten = 0;
+  int bytesWritten = 0;
+  bool mustRegister = false;
+  if (state_ == StateEnum::ESTABLISHED && !connecting()) {
+    if (writeReqHead_ == nullptr) {
+      // If we are established and there are no other writes pending,
+      // we can attempt to perform the write immediately.
+      assert(writeReqTail_ == nullptr);
+      assert((eventFlags_ & EventHandler::WRITE) == 0);
+
+      bytesWritten = performWrite(vec, count, flags,
+                                  &countWritten, &partialWritten);
+      if (bytesWritten < 0) {
+        AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
+                               withAddr("writev failed"), errno);
+        return failWrite(__func__, callback, 0, ex);
+      } else if (countWritten == count) {
+        // We successfully wrote everything.
+        // Invoke the callback and return.
+        if (callback) {
+          callback->writeSuccess();
+        }
+        return;
+      } // else { continue writing the next writeReq }
+      mustRegister = true;
+    }
+  } else if (!connecting()) {
+    // Invalid state for writing
+    return invalidState(callback);
+  }
+
+  // Create a new WriteRequest to add to the queue
+  WriteRequest* req;
+  try {
+    req = WriteRequest::newRequest(callback, vec + countWritten,
+                                   count - countWritten, std::move(ioBuf),
+                                   flags);
+  } catch (const std::exception& ex) {
+    // we mainly expect to catch std::bad_alloc here
+    AsyncSocketException tex(AsyncSocketException::INTERNAL_ERROR,
+        withAddr(string("failed to append new WriteRequest: ") + ex.what()));
+    return failWrite(__func__, callback, bytesWritten, tex);
+  }
+  req->consume(0, partialWritten, bytesWritten);
+  if (writeReqTail_ == nullptr) {
+    assert(writeReqHead_ == nullptr);
+    writeReqHead_ = writeReqTail_ = req;
+  } else {
+    writeReqTail_->append(req);
+    writeReqTail_ = req;
+  }
+
+  // Register for write events if are established and not currently
+  // waiting on write events
+  if (mustRegister) {
+    assert(state_ == StateEnum::ESTABLISHED);
+    assert((eventFlags_ & EventHandler::WRITE) == 0);
+    if (!updateEventRegistration(EventHandler::WRITE, 0)) {
+      assert(state_ == StateEnum::ERROR);
+      return;
+    }
+    if (sendTimeout_ > 0) {
+      // Schedule a timeout to fire if the write takes too long.
+      if (!writeTimeout_.scheduleTimeout(sendTimeout_)) {
+        AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
+                               withAddr("failed to schedule send timeout"));
+        return failWrite(__func__, ex);
+      }
+    }
+  }
+}
+
+void AsyncSocket::close() {
+  VLOG(5) << "AsyncSocket::close(): this=" << this << ", fd_=" << fd_
+          << ", state=" << state_ << ", shutdownFlags="
+          << std::hex << (int) shutdownFlags_;
+
+  // close() is only different from closeNow() when there are pending writes
+  // that need to drain before we can close.  In all other cases, just call
+  // closeNow().
+  //
+  // Note that writeReqHead_ can be non-nullptr even in STATE_CLOSED or
+  // STATE_ERROR if close() is invoked while a previous closeNow() or failure
+  // is still running.  (e.g., If there are multiple pending writes, and we
+  // call writeError() on the first one, it may call close().  In this case we
+  // will already be in STATE_CLOSED or STATE_ERROR, but the remaining pending
+  // writes will still be in the queue.)
+  //
+  // We only need to drain pending writes if we are still in STATE_CONNECTING
+  // or STATE_ESTABLISHED
+  if ((writeReqHead_ == nullptr) ||
+      !(state_ == StateEnum::CONNECTING ||
+      state_ == StateEnum::ESTABLISHED)) {
+    closeNow();
+    return;
+  }
+
+  // Declare a DestructorGuard to ensure that the AsyncSocket cannot be
+  // destroyed until close() returns.
+  DestructorGuard dg(this);
+  assert(eventBase_->isInEventBaseThread());
+
+  // Since there are write requests pending, we have to set the
+  // SHUT_WRITE_PENDING flag, and wait to perform the real close until the
+  // connect finishes and we finish writing these requests.
+  //
+  // Set SHUT_READ to indicate that reads are shut down, and set the
+  // SHUT_WRITE_PENDING flag to mark that we want to shutdown once the
+  // pending writes complete.
+  shutdownFlags_ |= (SHUT_READ | SHUT_WRITE_PENDING);
+
+  // If a read callback is set, invoke readEOF() immediately to inform it that
+  // the socket has been closed and no more data can be read.
+  if (readCallback_) {
+    // Disable reads if they are enabled
+    if (!updateEventRegistration(0, EventHandler::READ)) {
+      // We're now in the error state; callbacks have been cleaned up
+      assert(state_ == StateEnum::ERROR);
+      assert(readCallback_ == nullptr);
+    } else {
+      ReadCallback* callback = readCallback_;
+      readCallback_ = nullptr;
+      callback->readEOF();
+    }
+  }
+}
+
+void AsyncSocket::closeNow() {
+  VLOG(5) << "AsyncSocket::closeNow(): this=" << this << ", fd_=" << fd_
+          << ", state=" << state_ << ", shutdownFlags="
+          << std::hex << (int) shutdownFlags_;
+  DestructorGuard dg(this);
+  assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
+
+  switch (state_) {
+    case StateEnum::ESTABLISHED:
+    case StateEnum::CONNECTING:
+    {
+      shutdownFlags_ |= (SHUT_READ | SHUT_WRITE);
+      state_ = StateEnum::CLOSED;
+
+      // If the write timeout was set, cancel it.
+      writeTimeout_.cancelTimeout();
+
+      // If we are registered for I/O events, unregister.
+      if (eventFlags_ != EventHandler::NONE) {
+        eventFlags_ = EventHandler::NONE;
+        if (!updateEventRegistration()) {
+          // We will have been moved into the error state.
+          assert(state_ == StateEnum::ERROR);
+          return;
+        }
+      }
+
+      if (fd_ >= 0) {
+        ioHandler_.changeHandlerFD(-1);
+        doClose();
+      }
+
+      if (connectCallback_) {
+        ConnectCallback* callback = connectCallback_;
+        connectCallback_ = nullptr;
+        callback->connectErr(socketClosedLocallyEx);
+      }
+
+      failAllWrites(socketClosedLocallyEx);
+
+      if (readCallback_) {
+        ReadCallback* callback = readCallback_;
+        readCallback_ = nullptr;
+        callback->readEOF();
+      }
+      return;
+    }
+    case StateEnum::CLOSED:
+      // Do nothing.  It's possible that we are being called recursively
+      // from inside a callback that we invoked inside another call to close()
+      // that is still running.
+      return;
+    case StateEnum::ERROR:
+      // Do nothing.  The error handling code has performed (or is performing)
+      // cleanup.
+      return;
+    case StateEnum::UNINIT:
+      assert(eventFlags_ == EventHandler::NONE);
+      assert(connectCallback_ == nullptr);
+      assert(readCallback_ == nullptr);
+      assert(writeReqHead_ == nullptr);
+      shutdownFlags_ |= (SHUT_READ | SHUT_WRITE);
+      state_ = StateEnum::CLOSED;
+      return;
+  }
+
+  LOG(DFATAL) << "AsyncSocket::closeNow() (this=" << this << ", fd=" << fd_
+              << ") called in unknown state " << state_;
+}
+
+void AsyncSocket::closeWithReset() {
+  // Enable SO_LINGER, with the linger timeout set to 0.
+  // This will trigger a TCP reset when we close the socket.
+  if (fd_ >= 0) {
+    struct linger optLinger = {1, 0};
+    if (setSockOpt(SOL_SOCKET, SO_LINGER, &optLinger) != 0) {
+      VLOG(2) << "AsyncSocket::closeWithReset(): error setting SO_LINGER "
+              << "on " << fd_ << ": errno=" << errno;
+    }
+  }
+
+  // Then let closeNow() take care of the rest
+  closeNow();
+}
+
+void AsyncSocket::shutdownWrite() {
+  VLOG(5) << "AsyncSocket::shutdownWrite(): this=" << this << ", fd=" << fd_
+          << ", state=" << state_ << ", shutdownFlags="
+          << std::hex << (int) shutdownFlags_;
+
+  // If there are no pending writes, shutdownWrite() is identical to
+  // shutdownWriteNow().
+  if (writeReqHead_ == nullptr) {
+    shutdownWriteNow();
+    return;
+  }
+
+  assert(eventBase_->isInEventBaseThread());
+
+  // There are pending writes.  Set SHUT_WRITE_PENDING so that the actual
+  // shutdown will be performed once all writes complete.
+  shutdownFlags_ |= SHUT_WRITE_PENDING;
+}
+
+void AsyncSocket::shutdownWriteNow() {
+  VLOG(5) << "AsyncSocket::shutdownWriteNow(): this=" << this
+          << ", fd=" << fd_ << ", state=" << state_
+          << ", shutdownFlags=" << std::hex << (int) shutdownFlags_;
+
+  if (shutdownFlags_ & SHUT_WRITE) {
+    // Writes are already shutdown; nothing else to do.
+    return;
+  }
+
+  // If SHUT_READ is already set, just call closeNow() to completely
+  // close the socket.  This can happen if close() was called with writes
+  // pending, and then shutdownWriteNow() is called before all pending writes
+  // complete.
+  if (shutdownFlags_ & SHUT_READ) {
+    closeNow();
+    return;
+  }
+
+  DestructorGuard dg(this);
+  assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
+
+  switch (static_cast<StateEnum>(state_)) {
+    case StateEnum::ESTABLISHED:
+    {
+      shutdownFlags_ |= SHUT_WRITE;
+
+      // If the write timeout was set, cancel it.
+      writeTimeout_.cancelTimeout();
+
+      // If we are registered for write events, unregister.
+      if (!updateEventRegistration(0, EventHandler::WRITE)) {
+        // We will have been moved into the error state.
+        assert(state_ == StateEnum::ERROR);
+        return;
+      }
+
+      // Shutdown writes on the file descriptor
+      ::shutdown(fd_, SHUT_WR);
+
+      // Immediately fail all write requests
+      failAllWrites(socketShutdownForWritesEx);
+      return;
+    }
+    case StateEnum::CONNECTING:
+    {
+      // Set the SHUT_WRITE_PENDING flag.
+      // When the connection completes, it will check this flag,
+      // shutdown the write half of the socket, and then set SHUT_WRITE.
+      shutdownFlags_ |= SHUT_WRITE_PENDING;
+
+      // Immediately fail all write requests
+      failAllWrites(socketShutdownForWritesEx);
+      return;
+    }
+    case StateEnum::UNINIT:
+      // Callers normally shouldn't call shutdownWriteNow() before the socket
+      // even starts connecting.  Nonetheless, go ahead and set
+      // SHUT_WRITE_PENDING.  Once the socket eventually connects it will
+      // immediately shut down the write side of the socket.
+      shutdownFlags_ |= SHUT_WRITE_PENDING;
+      return;
+    case StateEnum::CLOSED:
+    case StateEnum::ERROR:
+      // We should never get here.  SHUT_WRITE should always be set
+      // in STATE_CLOSED and STATE_ERROR.
+      VLOG(4) << "AsyncSocket::shutdownWriteNow() (this=" << this
+                 << ", fd=" << fd_ << ") in unexpected state " << state_
+                 << " with SHUT_WRITE not set ("
+                 << std::hex << (int) shutdownFlags_ << ")";
+      assert(false);
+      return;
+  }
+
+  LOG(DFATAL) << "AsyncSocket::shutdownWriteNow() (this=" << this << ", fd="
+              << fd_ << ") called in unknown state " << state_;
+}
+
+bool AsyncSocket::readable() const {
+  if (fd_ == -1) {
+    return false;
+  }
+  struct pollfd fds[1];
+  fds[0].fd = fd_;
+  fds[0].events = POLLIN;
+  fds[0].revents = 0;
+  int rc = poll(fds, 1, 0);
+  return rc == 1;
+}
+
+bool AsyncSocket::isPending() const {
+  return ioHandler_.isPending();
+}
+
+bool AsyncSocket::hangup() const {
+  if (fd_ == -1) {
+    // sanity check, no one should ask for hangup if we are not connected.
+    assert(false);
+    return false;
+  }
+#ifdef POLLRDHUP // Linux-only
+  struct pollfd fds[1];
+  fds[0].fd = fd_;
+  fds[0].events = POLLRDHUP|POLLHUP;
+  fds[0].revents = 0;
+  poll(fds, 1, 0);
+  return (fds[0].revents & (POLLRDHUP|POLLHUP)) != 0;
+#else
+  return false;
+#endif
+}
+
+bool AsyncSocket::good() const {
+  return ((state_ == StateEnum::CONNECTING ||
+          state_ == StateEnum::ESTABLISHED) &&
+          (shutdownFlags_ == 0) && (eventBase_ != nullptr));
+}
+
+bool AsyncSocket::error() const {
+  return (state_ == StateEnum::ERROR);
+}
+
+void AsyncSocket::attachEventBase(EventBase* eventBase) {
+  VLOG(5) << "AsyncSocket::attachEventBase(this=" << this << ", fd=" << fd_
+          << ", old evb=" << eventBase_ << ", new evb=" << eventBase
+          << ", state=" << state_ << ", events="
+          << std::hex << eventFlags_ << ")";
+  assert(eventBase_ == nullptr);
+  assert(eventBase->isInEventBaseThread());
+
+  eventBase_ = eventBase;
+  ioHandler_.attachEventBase(eventBase);
+  writeTimeout_.attachEventBase(eventBase);
+}
+
+void AsyncSocket::detachEventBase() {
+  VLOG(5) << "AsyncSocket::detachEventBase(this=" << this << ", fd=" << fd_
+          << ", old evb=" << eventBase_ << ", state=" << state_
+          << ", events=" << std::hex << eventFlags_ << ")";
+  assert(eventBase_ != nullptr);
+  assert(eventBase_->isInEventBaseThread());
+
+  eventBase_ = nullptr;
+  ioHandler_.detachEventBase();
+  writeTimeout_.detachEventBase();
+}
+
+bool AsyncSocket::isDetachable() const {
+  DCHECK(eventBase_ != nullptr);
+  DCHECK(eventBase_->isInEventBaseThread());
+
+  return !ioHandler_.isHandlerRegistered() && !writeTimeout_.isScheduled();
+}
+
+void AsyncSocket::getLocalAddress(folly::SocketAddress* address) const {
+  address->setFromLocalAddress(fd_);
+}
+
+void AsyncSocket::getPeerAddress(folly::SocketAddress* address) const {
+  if (!addr_.isInitialized()) {
+    addr_.setFromPeerAddress(fd_);
+  }
+  *address = addr_;
+}
+
+int AsyncSocket::setNoDelay(bool noDelay) {
+  if (fd_ < 0) {
+    VLOG(4) << "AsyncSocket::setNoDelay() called on non-open socket "
+               << this << "(state=" << state_ << ")";
+    return EINVAL;
+
+  }
+
+  int value = noDelay ? 1 : 0;
+  if (setsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &value, sizeof(value)) != 0) {
+    int errnoCopy = errno;
+    VLOG(2) << "failed to update TCP_NODELAY option on AsyncSocket "
+            << this << " (fd=" << fd_ << ", state=" << state_ << "): "
+            << strerror(errnoCopy);
+    return errnoCopy;
+  }
+
+  return 0;
+}
+
+int AsyncSocket::setCongestionFlavor(const std::string &cname) {
+
+  #ifndef TCP_CONGESTION
+  #define TCP_CONGESTION  13
+  #endif
+
+  if (fd_ < 0) {
+    VLOG(4) << "AsyncSocket::setCongestionFlavor() called on non-open "
+               << "socket " << this << "(state=" << state_ << ")";
+    return EINVAL;
+
+  }
+
+  if (setsockopt(fd_, IPPROTO_TCP, TCP_CONGESTION, cname.c_str(),
+        cname.length() + 1) != 0) {
+    int errnoCopy = errno;
+    VLOG(2) << "failed to update TCP_CONGESTION option on AsyncSocket "
+            << this << "(fd=" << fd_ << ", state=" << state_ << "): "
+            << strerror(errnoCopy);
+    return errnoCopy;
+  }
+
+  return 0;
+}
+
+int AsyncSocket::setQuickAck(bool quickack) {
+  if (fd_ < 0) {
+    VLOG(4) << "AsyncSocket::setQuickAck() called on non-open socket "
+               << this << "(state=" << state_ << ")";
+    return EINVAL;
+
+  }
+
+#ifdef TCP_QUICKACK // Linux-only
+  int value = quickack ? 1 : 0;
+  if (setsockopt(fd_, IPPROTO_TCP, TCP_QUICKACK, &value, sizeof(value)) != 0) {
+    int errnoCopy = errno;
+    VLOG(2) << "failed to update TCP_QUICKACK option on AsyncSocket"
+            << this << "(fd=" << fd_ << ", state=" << state_ << "): "
+            << strerror(errnoCopy);
+    return errnoCopy;
+  }
+
+  return 0;
+#else
+  return ENOSYS;
+#endif
+}
+
+int AsyncSocket::setSendBufSize(size_t bufsize) {
+  if (fd_ < 0) {
+    VLOG(4) << "AsyncSocket::setSendBufSize() called on non-open socket "
+               << this << "(state=" << state_ << ")";
+    return EINVAL;
+  }
+
+  if (setsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &bufsize, sizeof(bufsize)) !=0) {
+    int errnoCopy = errno;
+    VLOG(2) << "failed to update SO_SNDBUF option on AsyncSocket"
+            << this << "(fd=" << fd_ << ", state=" << state_ << "): "
+            << strerror(errnoCopy);
+    return errnoCopy;
+  }
+
+  return 0;
+}
+
+int AsyncSocket::setRecvBufSize(size_t bufsize) {
+  if (fd_ < 0) {
+    VLOG(4) << "AsyncSocket::setRecvBufSize() called on non-open socket "
+               << this << "(state=" << state_ << ")";
+    return EINVAL;
+  }
+
+  if (setsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &bufsize, sizeof(bufsize)) !=0) {
+    int errnoCopy = errno;
+    VLOG(2) << "failed to update SO_RCVBUF option on AsyncSocket"
+            << this << "(fd=" << fd_ << ", state=" << state_ << "): "
+            << strerror(errnoCopy);
+    return errnoCopy;
+  }
+
+  return 0;
+}
+
+int AsyncSocket::setTCPProfile(int profd) {
+  if (fd_ < 0) {
+    VLOG(4) << "AsyncSocket::setTCPProfile() called on non-open socket "
+               << this << "(state=" << state_ << ")";
+    return EINVAL;
+  }
+
+  if (setsockopt(fd_, SOL_SOCKET, SO_SET_NAMESPACE, &profd, sizeof(int)) !=0) {
+    int errnoCopy = errno;
+    VLOG(2) << "failed to set socket namespace option on AsyncSocket"
+            << this << "(fd=" << fd_ << ", state=" << state_ << "): "
+            << strerror(errnoCopy);
+    return errnoCopy;
+  }
+
+  return 0;
+}
+
+void AsyncSocket::ioReady(uint16_t events) noexcept {
+  VLOG(7) << "AsyncSocket::ioRead() this=" << this << ", fd" << fd_
+          << ", events=" << std::hex << events << ", state=" << state_;
+  DestructorGuard dg(this);
+  assert(events & EventHandler::READ_WRITE);
+  assert(eventBase_->isInEventBaseThread());
+
+  uint16_t relevantEvents = events & EventHandler::READ_WRITE;
+  if (relevantEvents == EventHandler::READ) {
+    handleRead();
+  } else if (relevantEvents == EventHandler::WRITE) {
+    handleWrite();
+  } else if (relevantEvents == EventHandler::READ_WRITE) {
+    EventBase* originalEventBase = eventBase_;
+    // If both read and write events are ready, process writes first.
+    handleWrite();
+
+    // Return now if handleWrite() detached us from our EventBase
+    if (eventBase_ != originalEventBase) {
+      return;
+    }
+
+    // Only call handleRead() if a read callback is still installed.
+    // (It's possible that the read callback was uninstalled during
+    // handleWrite().)
+    if (readCallback_) {
+      handleRead();
+    }
+  } else {
+    VLOG(4) << "AsyncSocket::ioRead() called with unexpected events "
+               << std::hex << events << "(this=" << this << ")";
+    abort();
+  }
+}
+
+ssize_t AsyncSocket::performRead(void* buf, size_t buflen) {
+  ssize_t bytes = recv(fd_, buf, buflen, MSG_DONTWAIT);
+  if (bytes < 0) {
+    if (errno == EAGAIN || errno == EWOULDBLOCK) {
+      // No more data to read right now.
+      return READ_BLOCKING;
+    } else {
+      return READ_ERROR;
+    }
+  } else {
+    appBytesReceived_ += bytes;
+    return bytes;
+  }
+}
+
+void AsyncSocket::handleRead() noexcept {
+  VLOG(5) << "AsyncSocket::handleRead() this=" << this << ", fd=" << fd_
+          << ", state=" << state_;
+  assert(state_ == StateEnum::ESTABLISHED);
+  assert((shutdownFlags_ & SHUT_READ) == 0);
+  assert(readCallback_ != nullptr);
+  assert(eventFlags_ & EventHandler::READ);
+
+  // Loop until:
+  // - a read attempt would block
+  // - readCallback_ is uninstalled
+  // - the number of loop iterations exceeds the optional maximum
+  // - this AsyncSocket is moved to another EventBase
+  //
+  // When we invoke readDataAvailable() it may uninstall the readCallback_,
+  // which is why need to check for it here.
+  //
+  // The last bullet point is slightly subtle.  readDataAvailable() may also
+  // detach this socket from this EventBase.  However, before
+  // readDataAvailable() returns another thread may pick it up, attach it to
+  // a different EventBase, and install another readCallback_.  We need to
+  // exit immediately after readDataAvailable() returns if the eventBase_ has
+  // changed.  (The caller must perform some sort of locking to transfer the
+  // AsyncSocket between threads properly.  This will be sufficient to ensure
+  // that this thread sees the updated eventBase_ variable after
+  // readDataAvailable() returns.)
+  uint16_t numReads = 0;
+  EventBase* originalEventBase = eventBase_;
+  while (readCallback_ && eventBase_ == originalEventBase) {
+    // Get the buffer to read into.
+    void* buf = nullptr;
+    size_t buflen = 0;
+    try {
+      readCallback_->getReadBuffer(&buf, &buflen);
+    } catch (const AsyncSocketException& ex) {
+      return failRead(__func__, ex);
+    } catch (const std::exception& ex) {
+      AsyncSocketException tex(AsyncSocketException::BAD_ARGS,
+                              string("ReadCallback::getReadBuffer() "
+                                     "threw exception: ") +
+                              ex.what());
+      return failRead(__func__, tex);
+    } catch (...) {
+      AsyncSocketException ex(AsyncSocketException::BAD_ARGS,
+                             "ReadCallback::getReadBuffer() threw "
+                             "non-exception type");
+      return failRead(__func__, ex);
+    }
+    if (buf == nullptr || buflen == 0) {
+      AsyncSocketException ex(AsyncSocketException::BAD_ARGS,
+                             "ReadCallback::getReadBuffer() returned "
+                             "empty buffer");
+      return failRead(__func__, ex);
+    }
+
+    // Perform the read
+    ssize_t bytesRead = performRead(buf, buflen);
+    if (bytesRead > 0) {
+      readCallback_->readDataAvailable(bytesRead);
+      // Fall through and continue around the loop if the read
+      // completely filled the available buffer.
+      // Note that readCallback_ may have been uninstalled or changed inside
+      // readDataAvailable().
+      if (bytesRead < buflen) {
+        return;
+      }
+    } else if (bytesRead == READ_BLOCKING) {
+        // No more data to read right now.
+        return;
+    } else if (bytesRead == READ_ERROR) {
+      AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
+                             withAddr("recv() failed"), errno);
+      return failRead(__func__, ex);
+    } else {
+      assert(bytesRead == READ_EOF);
+      // EOF
+      shutdownFlags_ |= SHUT_READ;
+      if (!updateEventRegistration(0, EventHandler::READ)) {
+        // we've already been moved into STATE_ERROR
+        assert(state_ == StateEnum::ERROR);
+        assert(readCallback_ == nullptr);
+        return;
+      }
+
+      ReadCallback* callback = readCallback_;
+      readCallback_ = nullptr;
+      callback->readEOF();
+      return;
+    }
+    if (maxReadsPerEvent_ && (++numReads >= maxReadsPerEvent_)) {
+      return;
+    }
+  }
+}
+
+/**
+ * This function attempts to write as much data as possible, until no more data
+ * can be written.
+ *
+ * - If it sends all available data, it unregisters for write events, and stops
+ *   the writeTimeout_.
+ *
+ * - If not all of the data can be sent immediately, it reschedules
+ *   writeTimeout_ (if a non-zero timeout is set), and ensures the handler is
+ *   registered for write events.
+ */
+void AsyncSocket::handleWrite() noexcept {
+  VLOG(5) << "AsyncSocket::handleWrite() this=" << this << ", fd=" << fd_
+          << ", state=" << state_;
+  if (state_ == StateEnum::CONNECTING) {
+    handleConnect();
+    return;
+  }
+
+  // Normal write
+  assert(state_ == StateEnum::ESTABLISHED);
+  assert((shutdownFlags_ & SHUT_WRITE) == 0);
+  assert(writeReqHead_ != nullptr);
+
+  // Loop until we run out of write requests,
+  // or until this socket is moved to another EventBase.
+  // (See the comment in handleRead() explaining how this can happen.)
+  EventBase* originalEventBase = eventBase_;
+  while (writeReqHead_ != nullptr && eventBase_ == originalEventBase) {
+    uint32_t countWritten;
+    uint32_t partialWritten;
+    WriteFlags writeFlags = writeReqHead_->flags();
+    if (writeReqHead_->getNext() != nullptr) {
+      writeFlags = writeFlags | WriteFlags::CORK;
+    }
+    int bytesWritten = performWrite(writeReqHead_->getOps(),
+                                    writeReqHead_->getOpCount(),
+                                    writeFlags, &countWritten, &partialWritten);
+    if (bytesWritten < 0) {
+      AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
+                             withAddr("writev() failed"), errno);
+      return failWrite(__func__, ex);
+    } else if (countWritten == writeReqHead_->getOpCount()) {
+      // We finished this request
+      WriteRequest* req = writeReqHead_;
+      writeReqHead_ = req->getNext();
+
+      if (writeReqHead_ == nullptr) {
+        writeReqTail_ = nullptr;
+        // This is the last write request.
+        // Unregister for write events and cancel the send timer
+        // before we invoke the callback.  We have to update the state properly
+        // before calling the callback, since it may want to detach us from
+        // the EventBase.
+        if (eventFlags_ & EventHandler::WRITE) {
+          if (!updateEventRegistration(0, EventHandler::WRITE)) {
+            assert(state_ == StateEnum::ERROR);
+            return;
+          }
+          // Stop the send timeout
+          writeTimeout_.cancelTimeout();
+        }
+        assert(!writeTimeout_.isScheduled());
+
+        // If SHUT_WRITE_PENDING is set, we should shutdown the socket after
+        // we finish sending the last write request.
+        //
+        // We have to do this before invoking writeSuccess(), since
+        // writeSuccess() may detach us from our EventBase.
+        if (shutdownFlags_ & SHUT_WRITE_PENDING) {
+          assert(connectCallback_ == nullptr);
+          shutdownFlags_ |= SHUT_WRITE;
+
+          if (shutdownFlags_ & SHUT_READ) {
+            // Reads have already been shutdown.  Fully close the socket and
+            // move to STATE_CLOSED.
+            //
+            // Note: This code currently moves us to STATE_CLOSED even if
+            // close() hasn't ever been called.  This can occur if we have
+            // received EOF from the peer and shutdownWrite() has been called
+            // locally.  Should we bother staying in STATE_ESTABLISHED in this
+            // case, until close() is actually called?  I can't think of a
+            // reason why we would need to do so.  No other operations besides
+            // calling close() or destroying the socket can be performed at
+            // this point.
+            assert(readCallback_ == nullptr);
+            state_ = StateEnum::CLOSED;
+            if (fd_ >= 0) {
+              ioHandler_.changeHandlerFD(-1);
+              doClose();
+            }
+          } else {
+            // Reads are still enabled, so we are only doing a half-shutdown
+            ::shutdown(fd_, SHUT_WR);
+          }
+        }
+      }
+
+      // Invoke the callback
+      WriteCallback* callback = req->getCallback();
+      req->destroy();
+      if (callback) {
+        callback->writeSuccess();
+      }
+      // We'll continue around the loop, trying to write another request
+    } else {
+      // Partial write.
+      writeReqHead_->consume(countWritten, partialWritten, bytesWritten);
+      // Stop after a partial write; it's highly likely that a subsequent write
+      // attempt will just return EAGAIN.
+      //
+      // Ensure that we are registered for write events.
+      if ((eventFlags_ & EventHandler::WRITE) == 0) {
+        if (!updateEventRegistration(EventHandler::WRITE, 0)) {
+          assert(state_ == StateEnum::ERROR);
+          return;
+        }
+      }
+
+      // Reschedule the send timeout, since we have made some write progress.
+      if (sendTimeout_ > 0) {
+        if (!writeTimeout_.scheduleTimeout(sendTimeout_)) {
+          AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
+              withAddr("failed to reschedule write timeout"));
+          return failWrite(__func__, ex);
+        }
+      }
+      return;
+    }
+  }
+}
+
+void AsyncSocket::checkForImmediateRead() noexcept {
+  // We currently don't attempt to perform optimistic reads in AsyncSocket.
+  // (However, note that some subclasses do override this method.)
+  //
+  // Simply calling handleRead() here would be bad, as this would call
+  // readCallback_->getReadBuffer(), forcing the callback to allocate a read
+  // buffer even though no data may be available.  This would waste lots of
+  // memory, since the buffer will sit around unused until the socket actually
+  // becomes readable.
+  //
+  // Checking if the socket is readable now also seems like it would probably
+  // be a pessimism.  In most cases it probably wouldn't be readable, and we
+  // would just waste an extra system call.  Even if it is readable, waiting to
+  // find out from libevent on the next event loop doesn't seem that bad.
+}
+
+void AsyncSocket::handleInitialReadWrite() noexcept {
+  // Our callers should already be holding a DestructorGuard, but grab
+  // one here just to make sure, in case one of our calling code paths ever
+  // changes.
+  DestructorGuard dg(this);
+
+  // If we have a readCallback_, make sure we enable read events.  We
+  // may already be registered for reads if connectSuccess() set
+  // the read calback.
+  if (readCallback_ && !(eventFlags_ & EventHandler::READ)) {
+    assert(state_ == StateEnum::ESTABLISHED);
+    assert((shutdownFlags_ & SHUT_READ) == 0);
+    if (!updateEventRegistration(EventHandler::READ, 0)) {
+      assert(state_ == StateEnum::ERROR);
+      return;
+    }
+    checkForImmediateRead();
+  } else if (readCallback_ == nullptr) {
+    // Unregister for read events.
+    updateEventRegistration(0, EventHandler::READ);
+  }
+
+  // If we have write requests pending, try to send them immediately.
+  // Since we just finished accepting, there is a very good chance that we can
+  // write without blocking.
+  //
+  // However, we only process them if EventHandler::WRITE is not already set,
+  // which means that we're already blocked on a write attempt.  (This can
+  // happen if connectSuccess() called write() before returning.)
+  if (writeReqHead_ && !(eventFlags_ & EventHandler::WRITE)) {
+    // Call handleWrite() to perform write processing.
+    handleWrite();
+  } else if (writeReqHead_ == nullptr) {
+    // Unregister for write event.
+    updateEventRegistration(0, EventHandler::WRITE);
+  }
+}
+
+void AsyncSocket::handleConnect() noexcept {
+  VLOG(5) << "AsyncSocket::handleConnect() this=" << this << ", fd=" << fd_
+          << ", state=" << state_;
+  assert(state_ == StateEnum::CONNECTING);
+  // SHUT_WRITE can never be set while we are still connecting;
+  // SHUT_WRITE_PENDING may be set, be we only set SHUT_WRITE once the connect
+  // finishes
+  assert((shutdownFlags_ & SHUT_WRITE) == 0);
+
+  // In case we had a connect timeout, cancel the timeout
+  writeTimeout_.cancelTimeout();
+  // We don't use a persistent registration when waiting on a connect event,
+  // so we have been automatically unregistered now.  Update eventFlags_ to
+  // reflect reality.
+  assert(eventFlags_ == EventHandler::WRITE);
+  eventFlags_ = EventHandler::NONE;
+
+  // Call getsockopt() to check if the connect succeeded
+  int error;
+  socklen_t len = sizeof(error);
+  int rv = getsockopt(fd_, SOL_SOCKET, SO_ERROR, &error, &len);
+  if (rv != 0) {
+    AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
+                           withAddr("error calling getsockopt() after connect"),
+                           errno);
+    VLOG(4) << "AsyncSocket::handleConnect(this=" << this << ", fd="
+               << fd_ << " host=" << addr_.describe()
+               << ") exception:" << ex.what();
+    return failConnect(__func__, ex);
+  }
+
+  if (error != 0) {
+    AsyncSocketException ex(AsyncSocketException::NOT_OPEN,
+                           "connect failed", error);
+    VLOG(1) << "AsyncSocket::handleConnect(this=" << this << ", fd="
+            << fd_ << " host=" << addr_.describe()
+            << ") exception: " << ex.what();
+    return failConnect(__func__, ex);
+  }
+
+  // Move into STATE_ESTABLISHED
+  state_ = StateEnum::ESTABLISHED;
+
+  // If SHUT_WRITE_PENDING is set and we don't have any write requests to
+  // perform, immediately shutdown the write half of the socket.
+  if ((shutdownFlags_ & SHUT_WRITE_PENDING) && writeReqHead_ == nullptr) {
+    // SHUT_READ shouldn't be set.  If close() is called on the socket while we
+    // are still connecting we just abort the connect rather than waiting for
+    // it to complete.
+    assert((shutdownFlags_ & SHUT_READ) == 0);
+    ::shutdown(fd_, SHUT_WR);
+    shutdownFlags_ |= SHUT_WRITE;
+  }
+
+  VLOG(7) << "AsyncSocket " << this << ": fd " << fd_
+          << "successfully connected; state=" << state_;
+
+  // Remember the EventBase we are attached to, before we start invoking any
+  // callbacks (since the callbacks may call detachEventBase()).
+  EventBase* originalEventBase = eventBase_;
+
+  // Call the connect callback.
+  if (connectCallback_) {
+    ConnectCallback* callback = connectCallback_;
+    connectCallback_ = nullptr;
+    callback->connectSuccess();
+  }
+
+  // Note that the connect callback may have changed our state.
+  // (set or unset the read callback, called write(), closed the socket, etc.)
+  // The following code needs to handle these situations correctly.
+  //
+  // If the socket has been closed, readCallback_ and writeReqHead_ will
+  // always be nullptr, so that will prevent us from trying to read or write.
+  //
+  // The main thing to check for is if eventBase_ is still originalEventBase.
+  // If not, we have been detached from this event base, so we shouldn't
+  // perform any more operations.
+  if (eventBase_ != originalEventBase) {
+    return;
+  }
+
+  handleInitialReadWrite();
+}
+
+void AsyncSocket::timeoutExpired() noexcept {
+  VLOG(7) << "AsyncSocket " << this << ", fd " << fd_ << ": timeout expired: "
+          << "state=" << state_ << ", events=" << std::hex << eventFlags_;
+  DestructorGuard dg(this);
+  assert(eventBase_->isInEventBaseThread());
+
+  if (state_ == StateEnum::CONNECTING) {
+    // connect() timed out
+    // Unregister for I/O events.
+    AsyncSocketException ex(AsyncSocketException::TIMED_OUT,
+                           "connect timed out");
+    failConnect(__func__, ex);
+  } else {
+    // a normal write operation timed out
+    assert(state_ == StateEnum::ESTABLISHED);
+    AsyncSocketException ex(AsyncSocketException::TIMED_OUT, "write timed out");
+    failWrite(__func__, ex);
+  }
+}
+
+ssize_t AsyncSocket::performWrite(const iovec* vec,
+                                   uint32_t count,
+                                   WriteFlags flags,
+                                   uint32_t* countWritten,
+                                   uint32_t* partialWritten) {
+  // We use sendmsg() instead of writev() so that we can pass in MSG_NOSIGNAL
+  // We correctly handle EPIPE errors, so we never want to receive SIGPIPE
+  // (since it may terminate the program if the main program doesn't explicitly
+  // ignore it).
+  struct msghdr msg;
+  msg.msg_name = nullptr;
+  msg.msg_namelen = 0;
+  msg.msg_iov = const_cast<iovec *>(vec);
+#ifdef IOV_MAX // not defined on Android
+  msg.msg_iovlen = std::min(count, (uint32_t)IOV_MAX);
+#else
+  msg.msg_iovlen = std::min(count, (uint32_t)UIO_MAXIOV);
+#endif
+  msg.msg_control = nullptr;
+  msg.msg_controllen = 0;
+  msg.msg_flags = 0;
+
+  int msg_flags = MSG_DONTWAIT;
+
+#ifdef MSG_NOSIGNAL // Linux-only
+  msg_flags |= MSG_NOSIGNAL;
+  if (isSet(flags, WriteFlags::CORK)) {
+    // MSG_MORE tells the kernel we have more data to send, so wait for us to
+    // give it the rest of the data rather than immediately sending a partial
+    // frame, even when TCP_NODELAY is enabled.
+    msg_flags |= MSG_MORE;
+  }
+#endif
+  if (isSet(flags, WriteFlags::EOR)) {
+    // marks that this is the last byte of a record (response)
+    msg_flags |= MSG_EOR;
+  }
+  ssize_t totalWritten = ::sendmsg(fd_, &msg, msg_flags);
+  if (totalWritten < 0) {
+    if (errno == EAGAIN) {
+      // TCP buffer is full; we can't write any more data right now.
+      *countWritten = 0;
+      *partialWritten = 0;
+      return 0;
+    }
+    // error
+    *countWritten = 0;
+    *partialWritten = 0;
+    return -1;
+  }
+
+  appBytesWritten_ += totalWritten;
+
+  uint32_t bytesWritten;
+  uint32_t n;
+  for (bytesWritten = totalWritten, n = 0; n < count; ++n) {
+    const iovec* v = vec + n;
+    if (v->iov_len > bytesWritten) {
+      // Partial write finished in the middle of this iovec
+      *countWritten = n;
+      *partialWritten = bytesWritten;
+      return totalWritten;
+    }
+
+    bytesWritten -= v->iov_len;
+  }
+
+  assert(bytesWritten == 0);
+  *countWritten = n;
+  *partialWritten = 0;
+  return totalWritten;
+}
+
+/**
+ * Re-register the EventHandler after eventFlags_ has changed.
+ *
+ * If an error occurs, fail() is called to move the socket into the error state
+ * and call all currently installed callbacks.  After an error, the
+ * AsyncSocket is completely unregistered.
+ *
+ * @return Returns true on succcess, or false on error.
+ */
+bool AsyncSocket::updateEventRegistration() {
+  VLOG(5) << "AsyncSocket::updateEventRegistration(this=" << this
+          << ", fd=" << fd_ << ", evb=" << eventBase_ << ", state=" << state_
+          << ", events=" << std::hex << eventFlags_;
+  assert(eventBase_->isInEventBaseThread());
+  if (eventFlags_ == EventHandler::NONE) {
+    ioHandler_.unregisterHandler();
+    return true;
+  }
+
+  // Always register for persistent events, so we don't have to re-register
+  // after being called back.
+  if (!ioHandler_.registerHandler(eventFlags_ | EventHandler::PERSIST)) {
+    eventFlags_ = EventHandler::NONE; // we're not registered after error
+    AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
+        withAddr("failed to update AsyncSocket event registration"));
+    fail("updateEventRegistration", ex);
+    return false;
+  }
+
+  return true;
+}
+
+bool AsyncSocket::updateEventRegistration(uint16_t enable,
+                                           uint16_t disable) {
+  uint16_t oldFlags = eventFlags_;
+  eventFlags_ |= enable;
+  eventFlags_ &= ~disable;
+  if (eventFlags_ == oldFlags) {
+    return true;
+  } else {
+    return updateEventRegistration();
+  }
+}
+
+void AsyncSocket::startFail() {
+  // startFail() should only be called once
+  assert(state_ != StateEnum::ERROR);
+  assert(getDestructorGuardCount() > 0);
+  state_ = StateEnum::ERROR;
+  // Ensure that SHUT_READ and SHUT_WRITE are set,
+  // so all future attempts to read or write will be rejected
+  shutdownFlags_ |= (SHUT_READ | SHUT_WRITE);
+
+  if (eventFlags_ != EventHandler::NONE) {
+    eventFlags_ = EventHandler::NONE;
+    ioHandler_.unregisterHandler();
+  }
+  writeTimeout_.cancelTimeout();
+
+  if (fd_ >= 0) {
+    ioHandler_.changeHandlerFD(-1);
+    doClose();
+  }
+}
+
+void AsyncSocket::finishFail() {
+  assert(state_ == StateEnum::ERROR);
+  assert(getDestructorGuardCount() > 0);
+
+  AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
+                         withAddr("socket closing after error"));
+  if (connectCallback_) {
+    ConnectCallback* callback = connectCallback_;
+    connectCallback_ = nullptr;
+    callback->connectErr(ex);
+  }
+
+  failAllWrites(ex);
+
+  if (readCallback_) {
+    ReadCallback* callback = readCallback_;
+    readCallback_ = nullptr;
+    callback->readErr(ex);
+  }
+}
+
+void AsyncSocket::fail(const char* fn, const AsyncSocketException& ex) {
+  VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
+             << state_ << " host=" << addr_.describe()
+             << "): failed in " << fn << "(): "
+             << ex.what();
+  startFail();
+  finishFail();
+}
+
+void AsyncSocket::failConnect(const char* fn, const AsyncSocketException& ex) {
+  VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
+               << state_ << " host=" << addr_.describe()
+               << "): failed while connecting in " << fn << "(): "
+               << ex.what();
+  startFail();
+
+  if (connectCallback_ != nullptr) {
+    ConnectCallback* callback = connectCallback_;
+    connectCallback_ = nullptr;
+    callback->connectErr(ex);
+  }
+
+  finishFail();
+}
+
+void AsyncSocket::failRead(const char* fn, const AsyncSocketException& ex) {
+  VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
+               << state_ << " host=" << addr_.describe()
+               << "): failed while reading in " << fn << "(): "
+               << ex.what();
+  startFail();
+
+  if (readCallback_ != nullptr) {
+    ReadCallback* callback = readCallback_;
+    readCallback_ = nullptr;
+    callback->readErr(ex);
+  }
+
+  finishFail();
+}
+
+void AsyncSocket::failWrite(const char* fn, const AsyncSocketException& ex) {
+  VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
+               << state_ << " host=" << addr_.describe()
+               << "): failed while writing in " << fn << "(): "
+               << ex.what();
+  startFail();
+
+  // Only invoke the first write callback, since the error occurred while
+  // writing this request.  Let any other pending write callbacks be invoked in
+  // finishFail().
+  if (writeReqHead_ != nullptr) {
+    WriteRequest* req = writeReqHead_;
+    writeReqHead_ = req->getNext();
+    WriteCallback* callback = req->getCallback();
+    uint32_t bytesWritten = req->getBytesWritten();
+    req->destroy();
+    if (callback) {
+      callback->writeErr(bytesWritten, ex);
+    }
+  }
+
+  finishFail();
+}
+
+void AsyncSocket::failWrite(const char* fn, WriteCallback* callback,
+                             size_t bytesWritten,
+                             const AsyncSocketException& ex) {
+  // This version of failWrite() is used when the failure occurs before
+  // we've added the callback to writeReqHead_.
+  VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
+             << state_ << " host=" << addr_.describe()
+             <<"): failed while writing in " << fn << "(): "
+             << ex.what();
+  startFail();
+
+  if (callback != nullptr) {
+    callback->writeErr(bytesWritten, ex);
+  }
+
+  finishFail();
+}
+
+void AsyncSocket::failAllWrites(const AsyncSocketException& ex) {
+  // Invoke writeError() on all write callbacks.
+  // This is used when writes are forcibly shutdown with write requests
+  // pending, or when an error occurs with writes pending.
+  while (writeReqHead_ != nullptr) {
+    WriteRequest* req = writeReqHead_;
+    writeReqHead_ = req->getNext();
+    WriteCallback* callback = req->getCallback();
+    if (callback) {
+      callback->writeErr(req->getBytesWritten(), ex);
+    }
+    req->destroy();
+  }
+}
+
+void AsyncSocket::invalidState(ConnectCallback* callback) {
+  VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_
+             << "): connect() called in invalid state " << state_;
+
+  /*
+   * The invalidState() methods don't use the normal failure mechanisms,
+   * since we don't know what state we are in.  We don't want to call
+   * startFail()/finishFail() recursively if we are already in the middle of
+   * cleaning up.
+   */
+
+  AsyncSocketException ex(AsyncSocketException::ALREADY_OPEN,
+                         "connect() called with socket in invalid state");
+  if (state_ == StateEnum::CLOSED || state_ == StateEnum::ERROR) {
+    if (callback) {
+      callback->connectErr(ex);
+    }
+  } else {
+    // We can't use failConnect() here since connectCallback_
+    // may already be set to another callback.  Invoke this ConnectCallback
+    // here; any other connectCallback_ will be invoked in finishFail()
+    startFail();
+    if (callback) {
+      callback->connectErr(ex);
+    }
+    finishFail();
+  }
+}
+
+void AsyncSocket::invalidState(ReadCallback* callback) {
+  VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_
+             << "): setReadCallback(" << callback
+             << ") called in invalid state " << state_;
+
+  AsyncSocketException ex(AsyncSocketException::NOT_OPEN,
+                         "setReadCallback() called with socket in "
+                         "invalid state");
+  if (state_ == StateEnum::CLOSED || state_ == StateEnum::ERROR) {
+    if (callback) {
+      callback->readErr(ex);
+    }
+  } else {
+    startFail();
+    if (callback) {
+      callback->readErr(ex);
+    }
+    finishFail();
+  }
+}
+
+void AsyncSocket::invalidState(WriteCallback* callback) {
+  VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_
+             << "): write() called in invalid state " << state_;
+
+  AsyncSocketException ex(AsyncSocketException::NOT_OPEN,
+                         withAddr("write() called with socket in invalid state"));
+  if (state_ == StateEnum::CLOSED || state_ == StateEnum::ERROR) {
+    if (callback) {
+      callback->writeErr(0, ex);
+    }
+  } else {
+    startFail();
+    if (callback) {
+      callback->writeErr(0, ex);
+    }
+    finishFail();
+  }
+}
+
+void AsyncSocket::doClose() {
+  if (fd_ == -1) return;
+  if (shutdownSocketSet_) {
+    shutdownSocketSet_->close(fd_);
+  } else {
+    ::close(fd_);
+  }
+  fd_ = -1;
+}
+
+std::ostream& operator << (std::ostream& os,
+                           const AsyncSocket::StateEnum& state) {
+  os << static_cast<int>(state);
+  return os;
+}
+
+std::string AsyncSocket::withAddr(const std::string& s) {
+  // Don't use addr_ directly because it may not be initialized
+  // e.g. if constructed from fd
+  folly::SocketAddress peer, local;
+  try {
+    getPeerAddress(&peer);
+    getLocalAddress(&local);
+  } catch (const std::exception&) {
+    // ignore
+  } catch (...) {
+    // ignore
+  }
+  return s + " (peer=" + peer.describe() + ", local=" + local.describe() + ")";
+}
+
+} // folly
diff --git a/folly/io/async/AsyncSocket.h b/folly/io/async/AsyncSocket.h
new file mode 100644 (file)
index 0000000..77bd2b0
--- /dev/null
@@ -0,0 +1,766 @@
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <glog/logging.h>
+#include <folly/SocketAddress.h>
+#include <folly/io/ShutdownSocketSet.h>
+#include <folly/io/IOBuf.h>
+#include <folly/io/async/AsyncTimeout.h>
+#include <folly/io/async/AsyncSocketException.h>
+#include <folly/io/async/AsyncTransport.h>
+#include <folly/io/async/EventHandler.h>
+#include <folly/io/async/DelayedDestruction.h>
+
+#include <memory>
+#include <map>
+
+namespace folly {
+
+/**
+ * A class for performing asynchronous I/O on a socket.
+ *
+ * AsyncSocket allows users to asynchronously wait for data on a socket, and
+ * to asynchronously send data.
+ *
+ * The APIs for reading and writing are intentionally asymmetric.  Waiting for
+ * data to read is a persistent API: a callback is installed, and is notified
+ * whenever new data is available.  It continues to be notified of new events
+ * until it is uninstalled.
+ *
+ * AsyncSocket does not provide read timeout functionality, because it
+ * typically cannot determine when the timeout should be active.  Generally, a
+ * timeout should only be enabled when processing is blocked waiting on data
+ * from the remote endpoint.  For server sockets, the timeout should not be
+ * active if the server is currently processing one or more outstanding
+ * requests for this socket.  For client sockets, the timeout should not be
+ * active if there are no requests pending on the socket.  Additionally, if a
+ * client has multiple pending requests, it will ususally want a separate
+ * timeout for each request, rather than a single read timeout.
+ *
+ * The write API is fairly intuitive: a user can request to send a block of
+ * data, and a callback will be informed once the entire block has been
+ * transferred to the kernel, or on error.  AsyncSocket does provide a send
+ * timeout, since most callers want to give up if the remote end stops
+ * responding and no further progress can be made sending the data.
+ */
+
+class AsyncSocket : virtual public AsyncTransport {
+ public:
+  typedef std::unique_ptr<AsyncSocket, Destructor> UniquePtr;
+
+  class ConnectCallback {
+   public:
+    virtual ~ConnectCallback() {}
+
+    /**
+     * connectSuccess() will be invoked when the connection has been
+     * successfully established.
+     */
+    virtual void connectSuccess() noexcept = 0;
+
+    /**
+     * connectErr() will be invoked if the connection attempt fails.
+     *
+     * @param ex        An exception describing the error that occurred.
+     */
+    virtual void connectErr(const AsyncSocketException& ex)
+      noexcept = 0;
+  };
+
+  class ReadCallback {
+   public:
+    virtual ~ReadCallback() {}
+
+    /**
+     * When data becomes available, getReadBuffer() will be invoked to get the
+     * buffer into which data should be read.
+     *
+     * This method allows the ReadCallback to delay buffer allocation until
+     * data becomes available.  This allows applications to manage large
+     * numbers of idle connections, without having to maintain a separate read
+     * buffer for each idle connection.
+     *
+     * It is possible that in some cases, getReadBuffer() may be called
+     * multiple times before readDataAvailable() is invoked.  In this case, the
+     * data will be written to the buffer returned from the most recent call to
+     * readDataAvailable().  If the previous calls to readDataAvailable()
+     * returned different buffers, the ReadCallback is responsible for ensuring
+     * that they are not leaked.
+     *
+     * If getReadBuffer() throws an exception, returns a nullptr buffer, or
+     * returns a 0 length, the ReadCallback will be uninstalled and its
+     * readError() method will be invoked.
+     *
+     * getReadBuffer() is not allowed to change the transport state before it
+     * returns.  (For example, it should never uninstall the read callback, or
+     * set a different read callback.)
+     *
+     * @param bufReturn getReadBuffer() should update *bufReturn to contain the
+     *                  address of the read buffer.  This parameter will never
+     *                  be nullptr.
+     * @param lenReturn getReadBuffer() should update *lenReturn to contain the
+     *                  maximum number of bytes that may be written to the read
+     *                  buffer.  This parameter will never be nullptr.
+     */
+    virtual void getReadBuffer(void** bufReturn, size_t* lenReturn) = 0;
+
+    /**
+     * readDataAvailable() will be invoked when data has been successfully read
+     * into the buffer returned by the last call to getReadBuffer().
+     *
+     * The read callback remains installed after readDataAvailable() returns.
+     * It must be explicitly uninstalled to stop receiving read events.
+     * getReadBuffer() will be called at least once before each call to
+     * readDataAvailable().  getReadBuffer() will also be called before any
+     * call to readEOF().
+     *
+     * @param len       The number of bytes placed in the buffer.
+     */
+    virtual void readDataAvailable(size_t len) noexcept = 0;
+
+    /**
+     * readEOF() will be invoked when the transport is closed.
+     *
+     * The read callback will be automatically uninstalled immediately before
+     * readEOF() is invoked.
+     */
+    virtual void readEOF() noexcept = 0;
+
+    /**
+     * readError() will be invoked if an error occurs reading from the
+     * transport.
+     *
+     * The read callback will be automatically uninstalled immediately before
+     * readError() is invoked.
+     *
+     * @param ex        An exception describing the error that occurred.
+     */
+    virtual void readErr(const AsyncSocketException& ex)
+      noexcept = 0;
+  };
+
+  class WriteCallback {
+   public:
+    virtual ~WriteCallback() {}
+
+    /**
+     * writeSuccess() will be invoked when all of the data has been
+     * successfully written.
+     *
+     * Note that this mainly signals that the buffer containing the data to
+     * write is no longer needed and may be freed or re-used.  It does not
+     * guarantee that the data has been fully transmitted to the remote
+     * endpoint.  For example, on socket-based transports, writeSuccess() only
+     * indicates that the data has been given to the kernel for eventual
+     * transmission.
+     */
+    virtual void writeSuccess() noexcept = 0;
+
+    /**
+     * writeError() will be invoked if an error occurs writing the data.
+     *
+     * @param bytesWritten      The number of bytes that were successfull
+     * @param ex                An exception describing the error that occurred.
+     */
+    virtual void writeErr(size_t bytesWritten,
+                            const AsyncSocketException& ex)
+      noexcept = 0;
+  };
+
+  /**
+   * Create a new unconnected AsyncSocket.
+   *
+   * connect() must later be called on this socket to establish a connection.
+   */
+  explicit AsyncSocket(EventBase* evb);
+
+  void setShutdownSocketSet(ShutdownSocketSet* ss);
+
+  /**
+   * Create a new AsyncSocket and begin the connection process.
+   *
+   * @param evb             EventBase that will manage this socket.
+   * @param address         The address to connect to.
+   * @param connectTimeout  Optional timeout in milliseconds for the connection
+   *                        attempt.
+   */
+  AsyncSocket(EventBase* evb,
+               const folly::SocketAddress& address,
+               uint32_t connectTimeout = 0);
+
+  /**
+   * Create a new AsyncSocket and begin the connection process.
+   *
+   * @param evb             EventBase that will manage this socket.
+   * @param ip              IP address to connect to (dotted-quad).
+   * @param port            Destination port in host byte order.
+   * @param connectTimeout  Optional timeout in milliseconds for the connection
+   *                        attempt.
+   */
+  AsyncSocket(EventBase* evb,
+               const std::string& ip,
+               uint16_t port,
+               uint32_t connectTimeout = 0);
+
+  /**
+   * Create a AsyncSocket from an already connected socket file descriptor.
+   *
+   * Note that while AsyncSocket enables TCP_NODELAY for sockets it creates
+   * when connecting, it does not change the socket options when given an
+   * existing file descriptor.  If callers want TCP_NODELAY enabled when using
+   * this version of the constructor, they need to explicitly call
+   * setNoDelay(true) after the constructor returns.
+   *
+   * @param evb EventBase that will manage this socket.
+   * @param fd  File descriptor to take over (should be a connected socket).
+   */
+  AsyncSocket(EventBase* evb, int fd);
+
+  /**
+   * Helper function to create a shared_ptr<AsyncSocket>.
+   *
+   * This passes in the correct destructor object, since AsyncSocket's
+   * destructor is protected and cannot be invoked directly.
+   */
+  static std::shared_ptr<AsyncSocket> newSocket(EventBase* evb) {
+    return std::shared_ptr<AsyncSocket>(new AsyncSocket(evb),
+                                           Destructor());
+  }
+
+  /**
+   * Helper function to create a shared_ptr<AsyncSocket>.
+   */
+  static std::shared_ptr<AsyncSocket> newSocket(
+      EventBase* evb,
+      const folly::SocketAddress& address,
+      uint32_t connectTimeout = 0) {
+    return std::shared_ptr<AsyncSocket>(
+        new AsyncSocket(evb, address, connectTimeout),
+        Destructor());
+  }
+
+  /**
+   * Helper function to create a shared_ptr<AsyncSocket>.
+   */
+  static std::shared_ptr<AsyncSocket> newSocket(
+      EventBase* evb,
+      const std::string& ip,
+      uint16_t port,
+      uint32_t connectTimeout = 0) {
+    return std::shared_ptr<AsyncSocket>(
+        new AsyncSocket(evb, ip, port, connectTimeout),
+        Destructor());
+  }
+
+  /**
+   * Helper function to create a shared_ptr<AsyncSocket>.
+   */
+  static std::shared_ptr<AsyncSocket> newSocket(EventBase* evb, int fd) {
+    return std::shared_ptr<AsyncSocket>(new AsyncSocket(evb, fd),
+                                           Destructor());
+  }
+
+  /**
+   * Destroy the socket.
+   *
+   * AsyncSocket::destroy() must be called to destroy the socket.
+   * The normal destructor is private, and should not be invoked directly.
+   * This prevents callers from deleting a AsyncSocket while it is invoking a
+   * callback.
+   */
+  virtual void destroy();
+
+  /**
+   * Get the EventBase used by this socket.
+   */
+  EventBase* getEventBase() const override {
+    return eventBase_;
+  }
+
+  /**
+   * Get the file descriptor used by the AsyncSocket.
+   */
+  virtual int getFd() const {
+    return fd_;
+  }
+
+  /**
+   * Extract the file descriptor from the AsyncSocket.
+   *
+   * This will immediately cause any installed callbacks to be invoked with an
+   * error.  The AsyncSocket may no longer be used after the file descriptor
+   * has been extracted.
+   *
+   * Returns the file descriptor.  The caller assumes ownership of the
+   * descriptor, and it will not be closed when the AsyncSocket is destroyed.
+   */
+  virtual int detachFd();
+
+  /**
+   * Uniquely identifies a handle to a socket option value. Each
+   * combination of level and option name corresponds to one socket
+   * option value.
+   */
+  class OptionKey {
+   public:
+    bool operator<(const OptionKey& other) const {
+      if (level == other.level) {
+        return optname < other.optname;
+      }
+      return level < other.level;
+    }
+    int apply(int fd, int val) const {
+      return setsockopt(fd, level, optname, &val, sizeof(val));
+    }
+    int level;
+    int optname;
+  };
+
+  // Maps from a socket option key to its value
+  typedef std::map<OptionKey, int> OptionMap;
+
+  static const OptionMap emptyOptionMap;
+  static const folly::SocketAddress anyAddress;
+
+  /**
+   * Initiate a connection.
+   *
+   * @param callback  The callback to inform when the connection attempt
+   *                  completes.
+   * @param address   The address to connect to.
+   * @param timeout   A timeout value, in milliseconds.  If the connection
+   *                  does not succeed within this period,
+   *                  callback->connectError() will be invoked.
+   */
+  virtual void connect(ConnectCallback* callback,
+               const folly::SocketAddress& address,
+               int timeout = 0,
+               const OptionMap &options = emptyOptionMap,
+               const folly::SocketAddress& bindAddr = anyAddress
+               ) noexcept;
+  void connect(ConnectCallback* callback, const std::string& ip, uint16_t port,
+               int timeout = 00,
+               const OptionMap &options = emptyOptionMap) noexcept;
+
+  /**
+   * Set the send timeout.
+   *
+   * If write requests do not make any progress for more than the specified
+   * number of milliseconds, fail all pending writes and close the socket.
+   *
+   * If write requests are currently pending when setSendTimeout() is called,
+   * the timeout interval is immediately restarted using the new value.
+   *
+   * (See the comments for AsyncSocket for an explanation of why AsyncSocket
+   * provides setSendTimeout() but not setRecvTimeout().)
+   *
+   * @param milliseconds  The timeout duration, in milliseconds.  If 0, no
+   *                      timeout will be used.
+   */
+  void setSendTimeout(uint32_t milliseconds) override;
+
+  /**
+   * Get the send timeout.
+   *
+   * @return Returns the current send timeout, in milliseconds.  A return value
+   *         of 0 indicates that no timeout is set.
+   */
+  uint32_t getSendTimeout() const override {
+    return sendTimeout_;
+  }
+
+  /**
+   * Set the maximum number of reads to execute from the underlying
+   * socket each time the EventBase detects that new ingress data is
+   * available. The default is unlimited, but callers can use this method
+   * to limit the amount of data read from the socket per event loop
+   * iteration.
+   *
+   * @param maxReads  Maximum number of reads per data-available event;
+   *                  a value of zero means unlimited.
+   */
+  void setMaxReadsPerEvent(uint16_t maxReads) {
+    maxReadsPerEvent_ = maxReads;
+  }
+
+  /**
+   * Get the maximum number of reads this object will execute from
+   * the underlying socket each time the EventBase detects that new
+   * ingress data is available.
+   *
+   * @returns Maximum number of reads per data-available event; a value
+   *          of zero means unlimited.
+   */
+  uint16_t getMaxReadsPerEvent() const {
+    return maxReadsPerEvent_;
+  }
+
+  // Read and write methods
+  void setReadCB(ReadCallback* callback);
+  ReadCallback* getReadCallback() const;
+
+  void write(WriteCallback* callback, const void* buf, size_t bytes,
+             WriteFlags flags = WriteFlags::NONE);
+  void writev(WriteCallback* callback, const iovec* vec, size_t count,
+              WriteFlags flags = WriteFlags::NONE);
+  void writeChain(WriteCallback* callback,
+                  std::unique_ptr<folly::IOBuf>&& buf,
+                  WriteFlags flags = WriteFlags::NONE);
+
+  // Methods inherited from AsyncTransport
+  void close() override;
+  void closeNow() override;
+  void closeWithReset() override;
+  void shutdownWrite() override;
+  void shutdownWriteNow() override;
+
+  bool readable() const override;
+  bool isPending() const override;
+  virtual bool hangup() const;
+  bool good() const override;
+  bool error() const override;
+  void attachEventBase(EventBase* eventBase) override;
+  void detachEventBase() override;
+  bool isDetachable() const override;
+
+  void getLocalAddress(
+    folly::SocketAddress* address) const override;
+  void getPeerAddress(
+    folly::SocketAddress* address) const override;
+
+  bool isEorTrackingEnabled() const override { return false; }
+
+  void setEorTracking(bool track) override {}
+
+  bool connecting() const override {
+    return (state_ == StateEnum::CONNECTING);
+  }
+
+  size_t getAppBytesWritten() const override {
+    return appBytesWritten_;
+  }
+
+  size_t getRawBytesWritten() const override {
+    return getAppBytesWritten();
+  }
+
+  size_t getAppBytesReceived() const override {
+    return appBytesReceived_;
+  }
+
+  size_t getRawBytesReceived() const override {
+    return getAppBytesReceived();
+  }
+
+  // Methods controlling socket options
+
+  /**
+   * Force writes to be transmitted immediately.
+   *
+   * This controls the TCP_NODELAY socket option.  When enabled, TCP segments
+   * are sent as soon as possible, even if it is not a full frame of data.
+   * When disabled, the data may be buffered briefly to try and wait for a full
+   * frame of data.
+   *
+   * By default, TCP_NODELAY is enabled for AsyncSocket objects.
+   *
+   * This method will fail if the socket is not currently open.
+   *
+   * @return Returns 0 if the TCP_NODELAY flag was successfully updated,
+   *         or a non-zero errno value on error.
+   */
+  int setNoDelay(bool noDelay);
+
+  /*
+   * Set the Flavor of Congestion Control to be used for this Socket
+   * Please check '/lib/modules/<kernel>/kernel/net/ipv4' for tcp_*.ko
+   * first to make sure the module is available for plugging in
+   * Alternatively you can choose from net.ipv4.tcp_allowed_congestion_control
+   */
+  int setCongestionFlavor(const std::string &cname);
+
+  /*
+   * Forces ACKs to be sent immediately
+   *
+   * @return Returns 0 if the TCP_QUICKACK flag was successfully updated,
+   *         or a non-zero errno value on error.
+   */
+  int setQuickAck(bool quickack);
+
+  /**
+   * Set the send bufsize
+   */
+  int setSendBufSize(size_t bufsize);
+
+  /**
+   * Set the recv bufsize
+   */
+  int setRecvBufSize(size_t bufsize);
+
+  /**
+   * Sets a specific tcp personality
+   * Available only on kernels 3.2 and greater
+   */
+  #define SO_SET_NAMESPACE        41
+  int setTCPProfile(int profd);
+
+
+  /**
+   * Generic API for reading a socket option.
+   *
+   * @param level     same as the "level" parameter in getsockopt().
+   * @param optname   same as the "optname" parameter in getsockopt().
+   * @param optval    pointer to the variable in which the option value should
+   *                  be returned.
+   * @return          same as the return value of getsockopt().
+   */
+  template <typename T>
+  int  getSockOpt(int level, int optname, T *optval) {
+    return getsockopt(fd_, level, optname, optval, sizeof(T));
+  }
+
+  /**
+   * Generic API for setting a socket option.
+   *
+   * @param level     same as the "level" parameter in getsockopt().
+   * @param optname   same as the "optname" parameter in getsockopt().
+   * @param optval    the option value to set.
+   * @return          same as the return value of setsockopt().
+   */
+  template <typename T>
+  int setSockOpt(int  level,  int  optname,  const T *optval) {
+    return setsockopt(fd_, level, optname, optval, sizeof(T));
+  }
+
+ protected:
+  enum ReadResultEnum {
+    READ_EOF = 0,
+    READ_ERROR = -1,
+    READ_BLOCKING = -2,
+  };
+
+  /**
+   * Protected destructor.
+   *
+   * Users of AsyncSocket must never delete it directly.  Instead, invoke
+   * destroy() instead.  (See the documentation in DelayedDestruction.h for
+   * more details.)
+   */
+  ~AsyncSocket();
+
+  enum class StateEnum : uint8_t {
+    UNINIT,
+    CONNECTING,
+    ESTABLISHED,
+    CLOSED,
+    ERROR
+  };
+
+  friend std::ostream& operator << (std::ostream& os, const StateEnum& state);
+
+  enum ShutdownFlags {
+    /// shutdownWrite() called, but we are still waiting on writes to drain
+    SHUT_WRITE_PENDING = 0x01,
+    /// writes have been completely shut down
+    SHUT_WRITE = 0x02,
+    /**
+     * Reads have been shutdown.
+     *
+     * At the moment we don't distinguish between remote read shutdown
+     * (received EOF from the remote end) and local read shutdown.  We can
+     * only receive EOF when a read callback is set, and we immediately inform
+     * it of the EOF.  Therefore there doesn't seem to be any reason to have a
+     * separate state of "received EOF but the local side may still want to
+     * read".
+     *
+     * We also don't currently provide any API for only shutting down the read
+     * side of a socket.  (This is a no-op as far as TCP is concerned, anyway.)
+     */
+    SHUT_READ = 0x04,
+  };
+
+  class WriteRequest;
+
+  class WriteTimeout : public AsyncTimeout {
+   public:
+    WriteTimeout(AsyncSocket* socket, EventBase* eventBase)
+      : AsyncTimeout(eventBase)
+      , socket_(socket) {}
+
+    virtual void timeoutExpired() noexcept {
+      socket_->timeoutExpired();
+    }
+
+   private:
+    AsyncSocket* socket_;
+  };
+
+  class IoHandler : public EventHandler {
+   public:
+    IoHandler(AsyncSocket* socket, EventBase* eventBase)
+      : EventHandler(eventBase, -1)
+      , socket_(socket) {}
+    IoHandler(AsyncSocket* socket, EventBase* eventBase, int fd)
+      : EventHandler(eventBase, fd)
+      , socket_(socket) {}
+
+    virtual void handlerReady(uint16_t events) noexcept {
+      socket_->ioReady(events);
+    }
+
+   private:
+    AsyncSocket* socket_;
+  };
+
+  void init();
+
+  // event notification methods
+  void ioReady(uint16_t events) noexcept;
+  virtual void checkForImmediateRead() noexcept;
+  virtual void handleInitialReadWrite() noexcept;
+  virtual void handleRead() noexcept;
+  virtual void handleWrite() noexcept;
+  virtual void handleConnect() noexcept;
+  void timeoutExpired() noexcept;
+
+  /**
+   * Attempt to read from the socket.
+   *
+   * @param buf      The buffer to read data into.
+   * @param buflen   The length of the buffer.
+   *
+   * @return Returns the number of bytes read, or READ_EOF on EOF, or
+   * READ_ERROR on error, or READ_BLOCKING if the operation will
+   * block.
+   */
+  virtual ssize_t performRead(void* buf, size_t buflen);
+
+  /**
+   * Populate an iovec array from an IOBuf and attempt to write it.
+   *
+   * @param callback Write completion/error callback.
+   * @param vec      Target iovec array; caller retains ownership.
+   * @param count    Number of IOBufs to write, beginning at start of buf.
+   * @param buf      Chain of iovecs.
+   * @param flags    set of flags for the underlying write calls, like cork
+   */
+  void writeChainImpl(WriteCallback* callback, iovec* vec,
+                      size_t count, std::unique_ptr<folly::IOBuf>&& buf,
+                      WriteFlags flags);
+
+  /**
+   * Write as much data as possible to the socket without blocking,
+   * and queue up any leftover data to send when the socket can
+   * handle writes again.
+   *
+   * @param callback The callback to invoke when the write is completed.
+   * @param vec      Array of buffers to write; this method will make a
+   *                 copy of the vector (but not the buffers themselves)
+   *                 if the write has to be completed asynchronously.
+   * @param count    Number of elements in vec.
+   * @param buf      The IOBuf that manages the buffers referenced by
+   *                 vec, or a pointer to nullptr if the buffers are not
+   *                 associated with an IOBuf.  Note that ownership of
+   *                 the IOBuf is transferred here; upon completion of
+   *                 the write, the AsyncSocket deletes the IOBuf.
+   * @param flags    Set of write flags.
+   */
+  void writeImpl(WriteCallback* callback, const iovec* vec, size_t count,
+                 std::unique_ptr<folly::IOBuf>&& buf,
+                 WriteFlags flags = WriteFlags::NONE);
+
+  /**
+   * Attempt to write to the socket.
+   *
+   * @param vec             The iovec array pointing to the buffers to write.
+   * @param count           The length of the iovec array.
+   * @param flags           Set of write flags.
+   * @param countWritten    On return, the value pointed to by this parameter
+   *                          will contain the number of iovec entries that were
+   *                          fully written.
+   * @param partialWritten  On return, the value pointed to by this parameter
+   *                          will contain the number of bytes written in the
+   *                          partially written iovec entry.
+   *
+   * @return Returns the total number of bytes written, or -1 on error.  If no
+   *     data can be written immediately, 0 is returned.
+   */
+  virtual ssize_t performWrite(const iovec* vec, uint32_t count,
+                               WriteFlags flags, uint32_t* countWritten,
+                               uint32_t* partialWritten);
+
+  bool updateEventRegistration();
+
+  /**
+   * Update event registration.
+   *
+   * @param enable Flags of events to enable. Set it to 0 if no events
+   * need to be enabled in this call.
+   * @param disable Flags of events
+   * to disable. Set it to 0 if no events need to be disabled in this
+   * call.
+   *
+   * @return true iff the update is successful.
+   */
+  bool updateEventRegistration(uint16_t enable, uint16_t disable);
+
+  // Actually close the file descriptor and set it to -1 so we don't
+  // accidentally close it again.
+  void doClose();
+
+  // error handling methods
+  void startFail();
+  void finishFail();
+  void fail(const char* fn, const AsyncSocketException& ex);
+  void failConnect(const char* fn, const AsyncSocketException& ex);
+  void failRead(const char* fn, const AsyncSocketException& ex);
+  void failWrite(const char* fn, WriteCallback* callback, size_t bytesWritten,
+                 const AsyncSocketException& ex);
+  void failWrite(const char* fn, const AsyncSocketException& ex);
+  void failAllWrites(const AsyncSocketException& ex);
+  void invalidState(ConnectCallback* callback);
+  void invalidState(ReadCallback* callback);
+  void invalidState(WriteCallback* callback);
+
+  std::string withAddr(const std::string& s);
+
+  StateEnum state_;                     ///< StateEnum describing current state
+  uint8_t shutdownFlags_;               ///< Shutdown state (ShutdownFlags)
+  uint16_t eventFlags_;                 ///< EventBase::HandlerFlags settings
+  int fd_;                              ///< The socket file descriptor
+  mutable
+    folly::SocketAddress addr_;    ///< The address we tried to connect to
+  uint32_t sendTimeout_;                ///< The send timeout, in milliseconds
+  uint16_t maxReadsPerEvent_;           ///< Max reads per event loop iteration
+  EventBase* eventBase_;               ///< The EventBase
+  WriteTimeout writeTimeout_;           ///< A timeout for connect and write
+  IoHandler ioHandler_;                 ///< A EventHandler to monitor the fd
+
+  ConnectCallback* connectCallback_;    ///< ConnectCallback
+  ReadCallback* readCallback_;          ///< ReadCallback
+  WriteRequest* writeReqHead_;          ///< Chain of WriteRequests
+  WriteRequest* writeReqTail_;          ///< End of WriteRequest chain
+  ShutdownSocketSet* shutdownSocketSet_;
+  size_t appBytesReceived_;             ///< Num of bytes received from socket
+  size_t appBytesWritten_;              ///< Num of bytes written to socket
+};
+
+
+} // folly
diff --git a/folly/io/async/AsyncSocketException.h b/folly/io/async/AsyncSocketException.h
new file mode 100644 (file)
index 0000000..762e9bc
--- /dev/null
@@ -0,0 +1,79 @@
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <folly/io/async/DelayedDestruction.h>
+
+namespace folly {
+
+class AsyncSocketException : public std::runtime_error {
+ public:
+  enum AsyncSocketExceptionType
+  { UNKNOWN = 0
+  , NOT_OPEN = 1
+  , ALREADY_OPEN = 2
+  , TIMED_OUT = 3
+  , END_OF_FILE = 4
+  , INTERRUPTED = 5
+  , BAD_ARGS = 6
+  , CORRUPTED_DATA = 7
+  , INTERNAL_ERROR = 8
+  , NOT_SUPPORTED = 9
+  , INVALID_STATE = 10
+  , SSL_ERROR = 12
+  , COULD_NOT_BIND = 13
+  , SASL_HANDSHAKE_TIMEOUT = 14
+  };
+
+  AsyncSocketException(
+    AsyncSocketExceptionType type, const std::string& message) :
+      std::runtime_error(message),
+      type_(type), errno_(0) {}
+
+  /** Error code */
+  AsyncSocketExceptionType type_;
+
+  /** A copy of the errno. */
+  int errno_;
+
+  AsyncSocketException(AsyncSocketExceptionType type,
+                      const std::string& message,
+                      int errno_copy) :
+      std::runtime_error(getMessage(message, errno_copy)),
+      type_(type), errno_(errno_copy) {}
+
+  AsyncSocketExceptionType getType() const noexcept { return type_; }
+  int getErrno() const noexcept { return errno_; }
+
+ protected:
+  /** Just like strerror_r but returns a C++ string object. */
+  std::string strerror_s(int errno_copy) {
+    return "errno = " + folly::to<std::string>(errno_copy);
+  }
+
+  /** Return a message based on the input. */
+  std::string getMessage(const std::string &message,
+                                int errno_copy) {
+    if (errno_copy != 0) {
+      return message + ": " + strerror_s(errno_copy);
+    } else {
+      return message;
+    }
+  }
+};
+
+} // folly
diff --git a/folly/io/async/AsyncTransport.h b/folly/io/async/AsyncTransport.h
new file mode 100644 (file)
index 0000000..0b929c1
--- /dev/null
@@ -0,0 +1,317 @@
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+namespace folly {
+
+/*
+ * flags given by the application for write* calls
+ */
+enum class WriteFlags : uint32_t {
+  NONE = 0x00,
+  /*
+   * Whether to delay the output until a subsequent non-corked write.
+   * (Note: may not be supported in all subclasses or on all platforms.)
+   */
+  CORK = 0x01,
+  /*
+   * for a socket that has ACK latency enabled, it will cause the kernel
+   * to fire a TCP ESTATS event when the last byte of the given write call
+   * will be acknowledged.
+   */
+  EOR = 0x02,
+};
+
+/*
+ * union operator
+ */
+inline WriteFlags operator|(WriteFlags a, WriteFlags b) {
+  return static_cast<WriteFlags>(
+    static_cast<uint32_t>(a) | static_cast<uint32_t>(b));
+}
+
+/*
+ * intersection operator
+ */
+inline WriteFlags operator&(WriteFlags a, WriteFlags b) {
+  return static_cast<WriteFlags>(
+    static_cast<uint32_t>(a) & static_cast<uint32_t>(b));
+}
+
+/*
+ * exclusion parameter
+ */
+inline WriteFlags operator~(WriteFlags a) {
+  return static_cast<WriteFlags>(~static_cast<uint32_t>(a));
+}
+
+/*
+ * unset operator
+ */
+inline WriteFlags unSet(WriteFlags a, WriteFlags b) {
+  return a & ~b;
+}
+
+/*
+ * inclusion operator
+ */
+inline bool isSet(WriteFlags a, WriteFlags b) {
+  return (a & b) == b;
+}
+
+
+/**
+ * AsyncTransport defines an asynchronous API for streaming I/O.
+ *
+ * This class provides an API to for asynchronously waiting for data
+ * on a streaming transport, and for asynchronously sending data.
+ *
+ * The APIs for reading and writing are intentionally asymmetric.  Waiting for
+ * data to read is a persistent API: a callback is installed, and is notified
+ * whenever new data is available.  It continues to be notified of new events
+ * until it is uninstalled.
+ *
+ * AsyncTransport does not provide read timeout functionality, because it
+ * typically cannot determine when the timeout should be active.  Generally, a
+ * timeout should only be enabled when processing is blocked waiting on data
+ * from the remote endpoint.  For server-side applications, the timeout should
+ * not be active if the server is currently processing one or more outstanding
+ * requests on this transport.  For client-side applications, the timeout
+ * should not be active if there are no requests pending on the transport.
+ * Additionally, if a client has multiple pending requests, it will ususally
+ * want a separate timeout for each request, rather than a single read timeout.
+ *
+ * The write API is fairly intuitive: a user can request to send a block of
+ * data, and a callback will be informed once the entire block has been
+ * transferred to the kernel, or on error.  AsyncTransport does provide a send
+ * timeout, since most callers want to give up if the remote end stops
+ * responding and no further progress can be made sending the data.
+ */
+class AsyncTransport : public DelayedDestruction {
+ public:
+  typedef std::unique_ptr<AsyncTransport, Destructor> UniquePtr;
+
+  /**
+   * Close the transport.
+   *
+   * This gracefully closes the transport, waiting for all pending write
+   * requests to complete before actually closing the underlying transport.
+   *
+   * If a read callback is set, readEOF() will be called immediately.  If there
+   * are outstanding write requests, the close will be delayed until all
+   * remaining writes have completed.  No new writes may be started after
+   * close() has been called.
+   */
+  virtual void close() = 0;
+
+  /**
+   * Close the transport immediately.
+   *
+   * This closes the transport immediately, dropping any outstanding data
+   * waiting to be written.
+   *
+   * If a read callback is set, readEOF() will be called immediately.
+   * If there are outstanding write requests, these requests will be aborted
+   * and writeError() will be invoked immediately on all outstanding write
+   * callbacks.
+   */
+  virtual void closeNow() = 0;
+
+  /**
+   * Reset the transport immediately.
+   *
+   * This closes the transport immediately, sending a reset to the remote peer
+   * if possible to indicate abnormal shutdown.
+   *
+   * Note that not all subclasses implement this reset functionality: some
+   * subclasses may treat reset() the same as closeNow().  Subclasses that use
+   * TCP transports should terminate the connection with a TCP reset.
+   */
+  virtual void closeWithReset() {
+    closeNow();
+  }
+
+  /**
+   * Perform a half-shutdown of the write side of the transport.
+   *
+   * The caller should not make any more calls to write() or writev() after
+   * shutdownWrite() is called.  Any future write attempts will fail
+   * immediately.
+   *
+   * Not all transport types support half-shutdown.  If the underlying
+   * transport does not support half-shutdown, it will fully shutdown both the
+   * read and write sides of the transport.  (Fully shutting down the socket is
+   * better than doing nothing at all, since the caller may rely on the
+   * shutdownWrite() call to notify the other end of the connection that no
+   * more data can be read.)
+   *
+   * If there is pending data still waiting to be written on the transport,
+   * the actual shutdown will be delayed until the pending data has been
+   * written.
+   *
+   * Note: There is no corresponding shutdownRead() equivalent.  Simply
+   * uninstall the read callback if you wish to stop reading.  (On TCP sockets
+   * at least, shutting down the read side of the socket is a no-op anyway.)
+   */
+  virtual void shutdownWrite() = 0;
+
+  /**
+   * Perform a half-shutdown of the write side of the transport.
+   *
+   * shutdownWriteNow() is identical to shutdownWrite(), except that it
+   * immediately performs the shutdown, rather than waiting for pending writes
+   * to complete.  Any pending write requests will be immediately failed when
+   * shutdownWriteNow() is called.
+   */
+  virtual void shutdownWriteNow() = 0;
+
+  /**
+   * Determine if transport is open and ready to read or write.
+   *
+   * Note that this function returns false on EOF; you must also call error()
+   * to distinguish between an EOF and an error.
+   *
+   * @return  true iff the transport is open and ready, false otherwise.
+   */
+  virtual bool good() const = 0;
+
+  /**
+   * Determine if the transport is readable or not.
+   *
+   * @return  true iff the transport is readable, false otherwise.
+   */
+  virtual bool readable() const = 0;
+
+  /**
+   * Determine if the there is pending data on the transport.
+   *
+   * @return  true iff the if the there is pending data, false otherwise.
+   */
+  virtual bool isPending() const {
+    return readable();
+  }
+  /**
+   * Determine if transport is connected to the endpoint
+   *
+   * @return  false iff the transport is connected, otherwise true
+   */
+  virtual bool connecting() const = 0;
+
+  /**
+   * Determine if an error has occurred with this transport.
+   *
+   * @return  true iff an error has occurred (not EOF).
+   */
+  virtual bool error() const = 0;
+
+  /**
+   * Attach the transport to a EventBase.
+   *
+   * This may only be called if the transport is not currently attached to a
+   * EventBase (by an earlier call to detachEventBase()).
+   *
+   * This method must be invoked in the EventBase's thread.
+   */
+  virtual void attachEventBase(EventBase* eventBase) = 0;
+
+  /**
+   * Detach the transport from its EventBase.
+   *
+   * This may only be called when the transport is idle and has no reads or
+   * writes pending.  Once detached, the transport may not be used again until
+   * it is re-attached to a EventBase by calling attachEventBase().
+   *
+   * This method must be called from the current EventBase's thread.
+   */
+  virtual void detachEventBase() = 0;
+
+  /**
+   * Determine if the transport can be detached.
+   *
+   * This method must be called from the current EventBase's thread.
+   */
+  virtual bool isDetachable() const = 0;
+
+  /**
+   * Get the EventBase used by this transport.
+   *
+   * Returns nullptr if this transport is not currently attached to a
+   * EventBase.
+   */
+  virtual EventBase* getEventBase() const = 0;
+
+  /**
+   * Set the send timeout.
+   *
+   * If write requests do not make any progress for more than the specified
+   * number of milliseconds, fail all pending writes and close the transport.
+   *
+   * If write requests are currently pending when setSendTimeout() is called,
+   * the timeout interval is immediately restarted using the new value.
+   *
+   * @param milliseconds  The timeout duration, in milliseconds.  If 0, no
+   *                      timeout will be used.
+   */
+  virtual void setSendTimeout(uint32_t milliseconds) = 0;
+
+  /**
+   * Get the send timeout.
+   *
+   * @return Returns the current send timeout, in milliseconds.  A return value
+   *         of 0 indicates that no timeout is set.
+   */
+  virtual uint32_t getSendTimeout() const = 0;
+
+  /**
+   * Get the address of the local endpoint of this transport.
+   *
+   * This function may throw AsyncSocketException on error.
+   *
+   * @param address  The local address will be stored in the specified
+   *                 SocketAddress.
+   */
+  virtual void getLocalAddress(folly::SocketAddress* address) const = 0;
+
+  /**
+   * Get the address of the remote endpoint to which this transport is
+   * connected.
+   *
+   * This function may throw AsyncSocketException on error.
+   *
+   * @param address  The remote endpoint's address will be stored in the
+   *                 specified SocketAddress.
+   */
+  virtual void getPeerAddress(folly::SocketAddress* address) const = 0;
+
+  /**
+   * @return True iff end of record tracking is enabled
+   */
+  virtual bool isEorTrackingEnabled() const = 0;
+
+  virtual void setEorTracking(bool track) = 0;
+
+  virtual size_t getAppBytesWritten() const = 0;
+  virtual size_t getRawBytesWritten() const = 0;
+  virtual size_t getAppBytesReceived() const = 0;
+  virtual size_t getRawBytesReceived() const = 0;
+
+ protected:
+  virtual ~AsyncTransport() {}
+};
+
+
+} // folly