Handle wrapvFull when IOV_MAX is not defined.
[folly.git] / folly / io / async / AsyncSocket.cpp
1 /*
2  * Copyright 2016 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/io/async/AsyncSocket.h>
18
19 #include <folly/io/async/EventBase.h>
20 #include <folly/io/async/EventHandler.h>
21 #include <folly/SocketAddress.h>
22 #include <folly/io/IOBuf.h>
23 #include <folly/portability/SysUio.h>
24
25 #include <poll.h>
26 #include <errno.h>
27 #include <limits.h>
28 #include <unistd.h>
29 #include <thread>
30 #include <fcntl.h>
31 #include <sys/types.h>
32 #include <sys/socket.h>
33 #include <netinet/in.h>
34 #include <netinet/tcp.h>
35 #include <boost/preprocessor/control/if.hpp>
36
37 using std::string;
38 using std::unique_ptr;
39
40 namespace folly {
41
42 // static members initializers
43 const AsyncSocket::OptionMap AsyncSocket::emptyOptionMap;
44
45 const AsyncSocketException socketClosedLocallyEx(
46     AsyncSocketException::END_OF_FILE, "socket closed locally");
47 const AsyncSocketException socketShutdownForWritesEx(
48     AsyncSocketException::END_OF_FILE, "socket shutdown for writes");
49
50 // TODO: It might help performance to provide a version of BytesWriteRequest that
51 // users could derive from, so we can avoid the extra allocation for each call
52 // to write()/writev().  We could templatize TFramedAsyncChannel just like the
53 // protocols are currently templatized for transports.
54 //
55 // We would need the version for external users where they provide the iovec
56 // storage space, and only our internal version would allocate it at the end of
57 // the WriteRequest.
58
59 /* The default WriteRequest implementation, used for write(), writev() and
60  * writeChain()
61  *
62  * A new BytesWriteRequest operation is allocated on the heap for all write
63  * operations that cannot be completed immediately.
64  */
65 class AsyncSocket::BytesWriteRequest : public AsyncSocket::WriteRequest {
66  public:
67   static BytesWriteRequest* newRequest(AsyncSocket* socket,
68                                        WriteCallback* callback,
69                                        const iovec* ops,
70                                        uint32_t opCount,
71                                        uint32_t partialWritten,
72                                        uint32_t bytesWritten,
73                                        unique_ptr<IOBuf>&& ioBuf,
74                                        WriteFlags flags) {
75     assert(opCount > 0);
76     // Since we put a variable size iovec array at the end
77     // of each BytesWriteRequest, we have to manually allocate the memory.
78     void* buf = malloc(sizeof(BytesWriteRequest) +
79                        (opCount * sizeof(struct iovec)));
80     if (buf == nullptr) {
81       throw std::bad_alloc();
82     }
83
84     return new(buf) BytesWriteRequest(socket, callback, ops, opCount,
85                                       partialWritten, bytesWritten,
86                                       std::move(ioBuf), flags);
87   }
88
89   void destroy() override {
90     this->~BytesWriteRequest();
91     free(this);
92   }
93
94   bool performWrite() override {
95     WriteFlags writeFlags = flags_;
96     if (getNext() != nullptr) {
97       writeFlags = writeFlags | WriteFlags::CORK;
98     }
99     bytesWritten_ = socket_->performWrite(getOps(), getOpCount(), writeFlags,
100                                           &opsWritten_, &partialBytes_);
101     return bytesWritten_ >= 0;
102   }
103
104   bool isComplete() override {
105     return opsWritten_ == getOpCount();
106   }
107
108   void consume() override {
109     // Advance opIndex_ forward by opsWritten_
110     opIndex_ += opsWritten_;
111     assert(opIndex_ < opCount_);
112
113     // If we've finished writing any IOBufs, release them
114     if (ioBuf_) {
115       for (uint32_t i = opsWritten_; i != 0; --i) {
116         assert(ioBuf_);
117         ioBuf_ = ioBuf_->pop();
118       }
119     }
120
121     // Move partialBytes_ forward into the current iovec buffer
122     struct iovec* currentOp = writeOps_ + opIndex_;
123     assert((partialBytes_ < currentOp->iov_len) || (currentOp->iov_len == 0));
124     currentOp->iov_base =
125       reinterpret_cast<uint8_t*>(currentOp->iov_base) + partialBytes_;
126     currentOp->iov_len -= partialBytes_;
127
128     // Increment the totalBytesWritten_ count by bytesWritten_;
129     totalBytesWritten_ += bytesWritten_;
130   }
131
132  private:
133   BytesWriteRequest(AsyncSocket* socket,
134                     WriteCallback* callback,
135                     const struct iovec* ops,
136                     uint32_t opCount,
137                     uint32_t partialBytes,
138                     uint32_t bytesWritten,
139                     unique_ptr<IOBuf>&& ioBuf,
140                     WriteFlags flags)
141     : AsyncSocket::WriteRequest(socket, callback)
142     , opCount_(opCount)
143     , opIndex_(0)
144     , flags_(flags)
145     , ioBuf_(std::move(ioBuf))
146     , opsWritten_(0)
147     , partialBytes_(partialBytes)
148     , bytesWritten_(bytesWritten) {
149     memcpy(writeOps_, ops, sizeof(*ops) * opCount_);
150   }
151
152   // private destructor, to ensure callers use destroy()
153   ~BytesWriteRequest() override = default;
154
155   const struct iovec* getOps() const {
156     assert(opCount_ > opIndex_);
157     return writeOps_ + opIndex_;
158   }
159
160   uint32_t getOpCount() const {
161     assert(opCount_ > opIndex_);
162     return opCount_ - opIndex_;
163   }
164
165   uint32_t opCount_;            ///< number of entries in writeOps_
166   uint32_t opIndex_;            ///< current index into writeOps_
167   WriteFlags flags_;            ///< set for WriteFlags
168   unique_ptr<IOBuf> ioBuf_;     ///< underlying IOBuf, or nullptr if N/A
169
170   // for consume(), how much we wrote on the last write
171   uint32_t opsWritten_;         ///< complete ops written
172   uint32_t partialBytes_;       ///< partial bytes of incomplete op written
173   ssize_t bytesWritten_;        ///< bytes written altogether
174
175   struct iovec writeOps_[];     ///< write operation(s) list
176 };
177
178 AsyncSocket::AsyncSocket()
179   : eventBase_(nullptr)
180   , writeTimeout_(this, nullptr)
181   , ioHandler_(this, nullptr)
182   , immediateReadHandler_(this) {
183   VLOG(5) << "new AsyncSocket()";
184   init();
185 }
186
187 AsyncSocket::AsyncSocket(EventBase* evb)
188   : eventBase_(evb)
189   , writeTimeout_(this, evb)
190   , ioHandler_(this, evb)
191   , immediateReadHandler_(this) {
192   VLOG(5) << "new AsyncSocket(" << this << ", evb=" << evb << ")";
193   init();
194 }
195
196 AsyncSocket::AsyncSocket(EventBase* evb,
197                            const folly::SocketAddress& address,
198                            uint32_t connectTimeout)
199   : AsyncSocket(evb) {
200   connect(nullptr, address, connectTimeout);
201 }
202
203 AsyncSocket::AsyncSocket(EventBase* evb,
204                            const std::string& ip,
205                            uint16_t port,
206                            uint32_t connectTimeout)
207   : AsyncSocket(evb) {
208   connect(nullptr, ip, port, connectTimeout);
209 }
210
211 AsyncSocket::AsyncSocket(EventBase* evb, int fd)
212   : eventBase_(evb)
213   , writeTimeout_(this, evb)
214   , ioHandler_(this, evb, fd)
215   , immediateReadHandler_(this) {
216   VLOG(5) << "new AsyncSocket(" << this << ", evb=" << evb << ", fd="
217           << fd << ")";
218   init();
219   fd_ = fd;
220   setCloseOnExec();
221   state_ = StateEnum::ESTABLISHED;
222 }
223
224 // init() method, since constructor forwarding isn't supported in most
225 // compilers yet.
226 void AsyncSocket::init() {
227   assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
228   shutdownFlags_ = 0;
229   state_ = StateEnum::UNINIT;
230   eventFlags_ = EventHandler::NONE;
231   fd_ = -1;
232   sendTimeout_ = 0;
233   maxReadsPerEvent_ = 16;
234   connectCallback_ = nullptr;
235   readCallback_ = nullptr;
236   writeReqHead_ = nullptr;
237   writeReqTail_ = nullptr;
238   shutdownSocketSet_ = nullptr;
239   appBytesWritten_ = 0;
240   appBytesReceived_ = 0;
241 }
242
243 AsyncSocket::~AsyncSocket() {
244   VLOG(7) << "actual destruction of AsyncSocket(this=" << this
245           << ", evb=" << eventBase_ << ", fd=" << fd_
246           << ", state=" << state_ << ")";
247 }
248
249 void AsyncSocket::destroy() {
250   VLOG(5) << "AsyncSocket::destroy(this=" << this << ", evb=" << eventBase_
251           << ", fd=" << fd_ << ", state=" << state_;
252   // When destroy is called, close the socket immediately
253   closeNow();
254
255   // Then call DelayedDestruction::destroy() to take care of
256   // whether or not we need immediate or delayed destruction
257   DelayedDestruction::destroy();
258 }
259
260 int AsyncSocket::detachFd() {
261   VLOG(6) << "AsyncSocket::detachFd(this=" << this << ", fd=" << fd_
262           << ", evb=" << eventBase_ << ", state=" << state_
263           << ", events=" << std::hex << eventFlags_ << ")";
264   // Extract the fd, and set fd_ to -1 first, so closeNow() won't
265   // actually close the descriptor.
266   if (shutdownSocketSet_) {
267     shutdownSocketSet_->remove(fd_);
268   }
269   int fd = fd_;
270   fd_ = -1;
271   // Call closeNow() to invoke all pending callbacks with an error.
272   closeNow();
273   // Update the EventHandler to stop using this fd.
274   // This can only be done after closeNow() unregisters the handler.
275   ioHandler_.changeHandlerFD(-1);
276   return fd;
277 }
278
279 const folly::SocketAddress& AsyncSocket::anyAddress() {
280   static const folly::SocketAddress anyAddress =
281     folly::SocketAddress("0.0.0.0", 0);
282   return anyAddress;
283 }
284
285 void AsyncSocket::setShutdownSocketSet(ShutdownSocketSet* newSS) {
286   if (shutdownSocketSet_ == newSS) {
287     return;
288   }
289   if (shutdownSocketSet_ && fd_ != -1) {
290     shutdownSocketSet_->remove(fd_);
291   }
292   shutdownSocketSet_ = newSS;
293   if (shutdownSocketSet_ && fd_ != -1) {
294     shutdownSocketSet_->add(fd_);
295   }
296 }
297
298 void AsyncSocket::setCloseOnExec() {
299   int rv = fcntl(fd_, F_SETFD, FD_CLOEXEC);
300   if (rv != 0) {
301     throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
302                                withAddr("failed to set close-on-exec flag"),
303                                errno);
304   }
305 }
306
307 void AsyncSocket::connect(ConnectCallback* callback,
308                            const folly::SocketAddress& address,
309                            int timeout,
310                            const OptionMap &options,
311                            const folly::SocketAddress& bindAddr) noexcept {
312   DestructorGuard dg(this);
313   assert(eventBase_->isInEventBaseThread());
314
315   addr_ = address;
316
317   // Make sure we're in the uninitialized state
318   if (state_ != StateEnum::UNINIT) {
319     return invalidState(callback);
320   }
321
322   connectStartTime_ = std::chrono::steady_clock::now();
323   // Make connect end time at least >= connectStartTime.
324   connectEndTime_ = connectStartTime_;
325
326   assert(fd_ == -1);
327   state_ = StateEnum::CONNECTING;
328   connectCallback_ = callback;
329
330   sockaddr_storage addrStorage;
331   sockaddr* saddr = reinterpret_cast<sockaddr*>(&addrStorage);
332
333   try {
334     // Create the socket
335     // Technically the first parameter should actually be a protocol family
336     // constant (PF_xxx) rather than an address family (AF_xxx), but the
337     // distinction is mainly just historical.  In pretty much all
338     // implementations the PF_foo and AF_foo constants are identical.
339     fd_ = socket(address.getFamily(), SOCK_STREAM, 0);
340     if (fd_ < 0) {
341       throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
342                                 withAddr("failed to create socket"), errno);
343     }
344     if (shutdownSocketSet_) {
345       shutdownSocketSet_->add(fd_);
346     }
347     ioHandler_.changeHandlerFD(fd_);
348
349     setCloseOnExec();
350
351     // Put the socket in non-blocking mode
352     int flags = fcntl(fd_, F_GETFL, 0);
353     if (flags == -1) {
354       throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
355                                 withAddr("failed to get socket flags"), errno);
356     }
357     int rv = fcntl(fd_, F_SETFL, flags | O_NONBLOCK);
358     if (rv == -1) {
359       throw AsyncSocketException(
360           AsyncSocketException::INTERNAL_ERROR,
361           withAddr("failed to put socket in non-blocking mode"),
362           errno);
363     }
364
365 #if !defined(MSG_NOSIGNAL) && defined(F_SETNOSIGPIPE)
366     // iOS and OS X don't support MSG_NOSIGNAL; set F_SETNOSIGPIPE instead
367     rv = fcntl(fd_, F_SETNOSIGPIPE, 1);
368     if (rv == -1) {
369       throw AsyncSocketException(
370           AsyncSocketException::INTERNAL_ERROR,
371           "failed to enable F_SETNOSIGPIPE on socket",
372           errno);
373     }
374 #endif
375
376     // By default, turn on TCP_NODELAY
377     // If setNoDelay() fails, we continue anyway; this isn't a fatal error.
378     // setNoDelay() will log an error message if it fails.
379     if (address.getFamily() != AF_UNIX) {
380       (void)setNoDelay(true);
381     }
382
383     VLOG(5) << "AsyncSocket::connect(this=" << this << ", evb=" << eventBase_
384             << ", fd=" << fd_ << ", host=" << address.describe().c_str();
385
386     // bind the socket
387     if (bindAddr != anyAddress()) {
388       int one = 1;
389       if (::setsockopt(fd_, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one))) {
390         doClose();
391         throw AsyncSocketException(
392           AsyncSocketException::NOT_OPEN,
393           "failed to setsockopt prior to bind on " + bindAddr.describe(),
394           errno);
395       }
396
397       bindAddr.getAddress(&addrStorage);
398
399       if (::bind(fd_, saddr, bindAddr.getActualSize()) != 0) {
400         doClose();
401         throw AsyncSocketException(AsyncSocketException::NOT_OPEN,
402                                   "failed to bind to async socket: " +
403                                   bindAddr.describe(),
404                                   errno);
405       }
406     }
407
408     // Apply the additional options if any.
409     for (const auto& opt: options) {
410       int rv = opt.first.apply(fd_, opt.second);
411       if (rv != 0) {
412         throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
413                                   withAddr("failed to set socket option"),
414                                   errno);
415       }
416     }
417
418     // Perform the connect()
419     address.getAddress(&addrStorage);
420
421     rv = ::connect(fd_, saddr, address.getActualSize());
422     if (rv < 0) {
423       if (errno == EINPROGRESS) {
424         // Connection in progress.
425         if (timeout > 0) {
426           // Start a timer in case the connection takes too long.
427           if (!writeTimeout_.scheduleTimeout(timeout)) {
428             throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
429                 withAddr("failed to schedule AsyncSocket connect timeout"));
430           }
431         }
432
433         // Register for write events, so we'll
434         // be notified when the connection finishes/fails.
435         // Note that we don't register for a persistent event here.
436         assert(eventFlags_ == EventHandler::NONE);
437         eventFlags_ = EventHandler::WRITE;
438         if (!ioHandler_.registerHandler(eventFlags_)) {
439           throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
440               withAddr("failed to register AsyncSocket connect handler"));
441         }
442         return;
443       } else {
444         throw AsyncSocketException(AsyncSocketException::NOT_OPEN,
445                                   "connect failed (immediately)", errno);
446       }
447     }
448
449     // If we're still here the connect() succeeded immediately.
450     // Fall through to call the callback outside of this try...catch block
451   } catch (const AsyncSocketException& ex) {
452     return failConnect(__func__, ex);
453   } catch (const std::exception& ex) {
454     // shouldn't happen, but handle it just in case
455     VLOG(4) << "AsyncSocket::connect(this=" << this << ", fd=" << fd_
456                << "): unexpected " << typeid(ex).name() << " exception: "
457                << ex.what();
458     AsyncSocketException tex(AsyncSocketException::INTERNAL_ERROR,
459                             withAddr(string("unexpected exception: ") +
460                                      ex.what()));
461     return failConnect(__func__, tex);
462   }
463
464   // The connection succeeded immediately
465   // The read callback may not have been set yet, and no writes may be pending
466   // yet, so we don't have to register for any events at the moment.
467   VLOG(8) << "AsyncSocket::connect succeeded immediately; this=" << this;
468   assert(readCallback_ == nullptr);
469   assert(writeReqHead_ == nullptr);
470   state_ = StateEnum::ESTABLISHED;
471   invokeConnectSuccess();
472 }
473
474 void AsyncSocket::connect(ConnectCallback* callback,
475                            const string& ip, uint16_t port,
476                            int timeout,
477                            const OptionMap &options) noexcept {
478   DestructorGuard dg(this);
479   try {
480     connectCallback_ = callback;
481     connect(callback, folly::SocketAddress(ip, port), timeout, options);
482   } catch (const std::exception& ex) {
483     AsyncSocketException tex(AsyncSocketException::INTERNAL_ERROR,
484                             ex.what());
485     return failConnect(__func__, tex);
486   }
487 }
488
489 void AsyncSocket::cancelConnect() {
490   connectCallback_ = nullptr;
491   if (state_ == StateEnum::CONNECTING) {
492     closeNow();
493   }
494 }
495
496 void AsyncSocket::setSendTimeout(uint32_t milliseconds) {
497   sendTimeout_ = milliseconds;
498   assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
499
500   // If we are currently pending on write requests, immediately update
501   // writeTimeout_ with the new value.
502   if ((eventFlags_ & EventHandler::WRITE) &&
503       (state_ != StateEnum::CONNECTING)) {
504     assert(state_ == StateEnum::ESTABLISHED);
505     assert((shutdownFlags_ & SHUT_WRITE) == 0);
506     if (sendTimeout_ > 0) {
507       if (!writeTimeout_.scheduleTimeout(sendTimeout_)) {
508         AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
509             withAddr("failed to reschedule send timeout in setSendTimeout"));
510         return failWrite(__func__, ex);
511       }
512     } else {
513       writeTimeout_.cancelTimeout();
514     }
515   }
516 }
517
518 void AsyncSocket::setReadCB(ReadCallback *callback) {
519   VLOG(6) << "AsyncSocket::setReadCallback() this=" << this << ", fd=" << fd_
520           << ", callback=" << callback << ", state=" << state_;
521
522   // Short circuit if callback is the same as the existing readCallback_.
523   //
524   // Note that this is needed for proper functioning during some cleanup cases.
525   // During cleanup we allow setReadCallback(nullptr) to be called even if the
526   // read callback is already unset and we have been detached from an event
527   // base.  This check prevents us from asserting
528   // eventBase_->isInEventBaseThread() when eventBase_ is nullptr.
529   if (callback == readCallback_) {
530     return;
531   }
532
533   /* We are removing a read callback */
534   if (callback == nullptr &&
535       immediateReadHandler_.isLoopCallbackScheduled()) {
536     immediateReadHandler_.cancelLoopCallback();
537   }
538
539   if (shutdownFlags_ & SHUT_READ) {
540     // Reads have already been shut down on this socket.
541     //
542     // Allow setReadCallback(nullptr) to be called in this case, but don't
543     // allow a new callback to be set.
544     //
545     // For example, setReadCallback(nullptr) can happen after an error if we
546     // invoke some other error callback before invoking readError().  The other
547     // error callback that is invoked first may go ahead and clear the read
548     // callback before we get a chance to invoke readError().
549     if (callback != nullptr) {
550       return invalidState(callback);
551     }
552     assert((eventFlags_ & EventHandler::READ) == 0);
553     readCallback_ = nullptr;
554     return;
555   }
556
557   DestructorGuard dg(this);
558   assert(eventBase_->isInEventBaseThread());
559
560   switch ((StateEnum)state_) {
561     case StateEnum::CONNECTING:
562       // For convenience, we allow the read callback to be set while we are
563       // still connecting.  We just store the callback for now.  Once the
564       // connection completes we'll register for read events.
565       readCallback_ = callback;
566       return;
567     case StateEnum::ESTABLISHED:
568     {
569       readCallback_ = callback;
570       uint16_t oldFlags = eventFlags_;
571       if (readCallback_) {
572         eventFlags_ |= EventHandler::READ;
573       } else {
574         eventFlags_ &= ~EventHandler::READ;
575       }
576
577       // Update our registration if our flags have changed
578       if (eventFlags_ != oldFlags) {
579         // We intentionally ignore the return value here.
580         // updateEventRegistration() will move us into the error state if it
581         // fails, and we don't need to do anything else here afterwards.
582         (void)updateEventRegistration();
583       }
584
585       if (readCallback_) {
586         checkForImmediateRead();
587       }
588       return;
589     }
590     case StateEnum::CLOSED:
591     case StateEnum::ERROR:
592       // We should never reach here.  SHUT_READ should always be set
593       // if we are in STATE_CLOSED or STATE_ERROR.
594       assert(false);
595       return invalidState(callback);
596     case StateEnum::UNINIT:
597       // We do not allow setReadCallback() to be called before we start
598       // connecting.
599       return invalidState(callback);
600   }
601
602   // We don't put a default case in the switch statement, so that the compiler
603   // will warn us to update the switch statement if a new state is added.
604   return invalidState(callback);
605 }
606
607 AsyncSocket::ReadCallback* AsyncSocket::getReadCallback() const {
608   return readCallback_;
609 }
610
611 void AsyncSocket::write(WriteCallback* callback,
612                          const void* buf, size_t bytes, WriteFlags flags) {
613   iovec op;
614   op.iov_base = const_cast<void*>(buf);
615   op.iov_len = bytes;
616   writeImpl(callback, &op, 1, unique_ptr<IOBuf>(), flags);
617 }
618
619 void AsyncSocket::writev(WriteCallback* callback,
620                           const iovec* vec,
621                           size_t count,
622                           WriteFlags flags) {
623   writeImpl(callback, vec, count, unique_ptr<IOBuf>(), flags);
624 }
625
626 void AsyncSocket::writeChain(WriteCallback* callback, unique_ptr<IOBuf>&& buf,
627                               WriteFlags flags) {
628   constexpr size_t kSmallSizeMax = 64;
629   size_t count = buf->countChainElements();
630   if (count <= kSmallSizeMax) {
631     iovec vec[BOOST_PP_IF(FOLLY_HAVE_VLA, count, kSmallSizeMax)];
632     writeChainImpl(callback, vec, count, std::move(buf), flags);
633   } else {
634     iovec* vec = new iovec[count];
635     writeChainImpl(callback, vec, count, std::move(buf), flags);
636     delete[] vec;
637   }
638 }
639
640 void AsyncSocket::writeChainImpl(WriteCallback* callback, iovec* vec,
641     size_t count, unique_ptr<IOBuf>&& buf, WriteFlags flags) {
642   size_t veclen = buf->fillIov(vec, count);
643   writeImpl(callback, vec, veclen, std::move(buf), flags);
644 }
645
646 void AsyncSocket::writeImpl(WriteCallback* callback, const iovec* vec,
647                              size_t count, unique_ptr<IOBuf>&& buf,
648                              WriteFlags flags) {
649   VLOG(6) << "AsyncSocket::writev() this=" << this << ", fd=" << fd_
650           << ", callback=" << callback << ", count=" << count
651           << ", state=" << state_;
652   DestructorGuard dg(this);
653   unique_ptr<IOBuf>ioBuf(std::move(buf));
654   assert(eventBase_->isInEventBaseThread());
655
656   if (shutdownFlags_ & (SHUT_WRITE | SHUT_WRITE_PENDING)) {
657     // No new writes may be performed after the write side of the socket has
658     // been shutdown.
659     //
660     // We could just call callback->writeError() here to fail just this write.
661     // However, fail hard and use invalidState() to fail all outstanding
662     // callbacks and move the socket into the error state.  There's most likely
663     // a bug in the caller's code, so we abort everything rather than trying to
664     // proceed as best we can.
665     return invalidState(callback);
666   }
667
668   uint32_t countWritten = 0;
669   uint32_t partialWritten = 0;
670   int bytesWritten = 0;
671   bool mustRegister = false;
672   if (state_ == StateEnum::ESTABLISHED && !connecting()) {
673     if (writeReqHead_ == nullptr) {
674       // If we are established and there are no other writes pending,
675       // we can attempt to perform the write immediately.
676       assert(writeReqTail_ == nullptr);
677       assert((eventFlags_ & EventHandler::WRITE) == 0);
678
679       bytesWritten = performWrite(vec, count, flags,
680                                   &countWritten, &partialWritten);
681       if (bytesWritten < 0) {
682         AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
683                                withAddr("writev failed"), errno);
684         return failWrite(__func__, callback, 0, ex);
685       } else if (countWritten == count) {
686         // We successfully wrote everything.
687         // Invoke the callback and return.
688         if (callback) {
689           callback->writeSuccess();
690         }
691         return;
692       } else { // continue writing the next writeReq
693         if (bufferCallback_) {
694           bufferCallback_->onEgressBuffered();
695         }
696       }
697       mustRegister = true;
698     }
699   } else if (!connecting()) {
700     // Invalid state for writing
701     return invalidState(callback);
702   }
703
704   // Create a new WriteRequest to add to the queue
705   WriteRequest* req;
706   try {
707     req = BytesWriteRequest::newRequest(this, callback, vec + countWritten,
708                                         count - countWritten, partialWritten,
709                                         bytesWritten, std::move(ioBuf), flags);
710   } catch (const std::exception& ex) {
711     // we mainly expect to catch std::bad_alloc here
712     AsyncSocketException tex(AsyncSocketException::INTERNAL_ERROR,
713         withAddr(string("failed to append new WriteRequest: ") + ex.what()));
714     return failWrite(__func__, callback, bytesWritten, tex);
715   }
716   req->consume();
717   if (writeReqTail_ == nullptr) {
718     assert(writeReqHead_ == nullptr);
719     writeReqHead_ = writeReqTail_ = req;
720   } else {
721     writeReqTail_->append(req);
722     writeReqTail_ = req;
723   }
724
725   // Register for write events if are established and not currently
726   // waiting on write events
727   if (mustRegister) {
728     assert(state_ == StateEnum::ESTABLISHED);
729     assert((eventFlags_ & EventHandler::WRITE) == 0);
730     if (!updateEventRegistration(EventHandler::WRITE, 0)) {
731       assert(state_ == StateEnum::ERROR);
732       return;
733     }
734     if (sendTimeout_ > 0) {
735       // Schedule a timeout to fire if the write takes too long.
736       if (!writeTimeout_.scheduleTimeout(sendTimeout_)) {
737         AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
738                                withAddr("failed to schedule send timeout"));
739         return failWrite(__func__, ex);
740       }
741     }
742   }
743 }
744
745 void AsyncSocket::writeRequest(WriteRequest* req) {
746   if (writeReqTail_ == nullptr) {
747     assert(writeReqHead_ == nullptr);
748     writeReqHead_ = writeReqTail_ = req;
749     req->start();
750   } else {
751     writeReqTail_->append(req);
752     writeReqTail_ = req;
753   }
754 }
755
756 void AsyncSocket::close() {
757   VLOG(5) << "AsyncSocket::close(): this=" << this << ", fd_=" << fd_
758           << ", state=" << state_ << ", shutdownFlags="
759           << std::hex << (int) shutdownFlags_;
760
761   // close() is only different from closeNow() when there are pending writes
762   // that need to drain before we can close.  In all other cases, just call
763   // closeNow().
764   //
765   // Note that writeReqHead_ can be non-nullptr even in STATE_CLOSED or
766   // STATE_ERROR if close() is invoked while a previous closeNow() or failure
767   // is still running.  (e.g., If there are multiple pending writes, and we
768   // call writeError() on the first one, it may call close().  In this case we
769   // will already be in STATE_CLOSED or STATE_ERROR, but the remaining pending
770   // writes will still be in the queue.)
771   //
772   // We only need to drain pending writes if we are still in STATE_CONNECTING
773   // or STATE_ESTABLISHED
774   if ((writeReqHead_ == nullptr) ||
775       !(state_ == StateEnum::CONNECTING ||
776       state_ == StateEnum::ESTABLISHED)) {
777     closeNow();
778     return;
779   }
780
781   // Declare a DestructorGuard to ensure that the AsyncSocket cannot be
782   // destroyed until close() returns.
783   DestructorGuard dg(this);
784   assert(eventBase_->isInEventBaseThread());
785
786   // Since there are write requests pending, we have to set the
787   // SHUT_WRITE_PENDING flag, and wait to perform the real close until the
788   // connect finishes and we finish writing these requests.
789   //
790   // Set SHUT_READ to indicate that reads are shut down, and set the
791   // SHUT_WRITE_PENDING flag to mark that we want to shutdown once the
792   // pending writes complete.
793   shutdownFlags_ |= (SHUT_READ | SHUT_WRITE_PENDING);
794
795   // If a read callback is set, invoke readEOF() immediately to inform it that
796   // the socket has been closed and no more data can be read.
797   if (readCallback_) {
798     // Disable reads if they are enabled
799     if (!updateEventRegistration(0, EventHandler::READ)) {
800       // We're now in the error state; callbacks have been cleaned up
801       assert(state_ == StateEnum::ERROR);
802       assert(readCallback_ == nullptr);
803     } else {
804       ReadCallback* callback = readCallback_;
805       readCallback_ = nullptr;
806       callback->readEOF();
807     }
808   }
809 }
810
811 void AsyncSocket::closeNow() {
812   VLOG(5) << "AsyncSocket::closeNow(): this=" << this << ", fd_=" << fd_
813           << ", state=" << state_ << ", shutdownFlags="
814           << std::hex << (int) shutdownFlags_;
815   DestructorGuard dg(this);
816   assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
817
818   switch (state_) {
819     case StateEnum::ESTABLISHED:
820     case StateEnum::CONNECTING:
821     {
822       shutdownFlags_ |= (SHUT_READ | SHUT_WRITE);
823       state_ = StateEnum::CLOSED;
824
825       // If the write timeout was set, cancel it.
826       writeTimeout_.cancelTimeout();
827
828       // If we are registered for I/O events, unregister.
829       if (eventFlags_ != EventHandler::NONE) {
830         eventFlags_ = EventHandler::NONE;
831         if (!updateEventRegistration()) {
832           // We will have been moved into the error state.
833           assert(state_ == StateEnum::ERROR);
834           return;
835         }
836       }
837
838       if (immediateReadHandler_.isLoopCallbackScheduled()) {
839         immediateReadHandler_.cancelLoopCallback();
840       }
841
842       if (fd_ >= 0) {
843         ioHandler_.changeHandlerFD(-1);
844         doClose();
845       }
846
847       invokeConnectErr(socketClosedLocallyEx);
848
849       failAllWrites(socketClosedLocallyEx);
850
851       if (readCallback_) {
852         ReadCallback* callback = readCallback_;
853         readCallback_ = nullptr;
854         callback->readEOF();
855       }
856       return;
857     }
858     case StateEnum::CLOSED:
859       // Do nothing.  It's possible that we are being called recursively
860       // from inside a callback that we invoked inside another call to close()
861       // that is still running.
862       return;
863     case StateEnum::ERROR:
864       // Do nothing.  The error handling code has performed (or is performing)
865       // cleanup.
866       return;
867     case StateEnum::UNINIT:
868       assert(eventFlags_ == EventHandler::NONE);
869       assert(connectCallback_ == nullptr);
870       assert(readCallback_ == nullptr);
871       assert(writeReqHead_ == nullptr);
872       shutdownFlags_ |= (SHUT_READ | SHUT_WRITE);
873       state_ = StateEnum::CLOSED;
874       return;
875   }
876
877   LOG(DFATAL) << "AsyncSocket::closeNow() (this=" << this << ", fd=" << fd_
878               << ") called in unknown state " << state_;
879 }
880
881 void AsyncSocket::closeWithReset() {
882   // Enable SO_LINGER, with the linger timeout set to 0.
883   // This will trigger a TCP reset when we close the socket.
884   if (fd_ >= 0) {
885     struct linger optLinger = {1, 0};
886     if (setSockOpt(SOL_SOCKET, SO_LINGER, &optLinger) != 0) {
887       VLOG(2) << "AsyncSocket::closeWithReset(): error setting SO_LINGER "
888               << "on " << fd_ << ": errno=" << errno;
889     }
890   }
891
892   // Then let closeNow() take care of the rest
893   closeNow();
894 }
895
896 void AsyncSocket::shutdownWrite() {
897   VLOG(5) << "AsyncSocket::shutdownWrite(): this=" << this << ", fd=" << fd_
898           << ", state=" << state_ << ", shutdownFlags="
899           << std::hex << (int) shutdownFlags_;
900
901   // If there are no pending writes, shutdownWrite() is identical to
902   // shutdownWriteNow().
903   if (writeReqHead_ == nullptr) {
904     shutdownWriteNow();
905     return;
906   }
907
908   assert(eventBase_->isInEventBaseThread());
909
910   // There are pending writes.  Set SHUT_WRITE_PENDING so that the actual
911   // shutdown will be performed once all writes complete.
912   shutdownFlags_ |= SHUT_WRITE_PENDING;
913 }
914
915 void AsyncSocket::shutdownWriteNow() {
916   VLOG(5) << "AsyncSocket::shutdownWriteNow(): this=" << this
917           << ", fd=" << fd_ << ", state=" << state_
918           << ", shutdownFlags=" << std::hex << (int) shutdownFlags_;
919
920   if (shutdownFlags_ & SHUT_WRITE) {
921     // Writes are already shutdown; nothing else to do.
922     return;
923   }
924
925   // If SHUT_READ is already set, just call closeNow() to completely
926   // close the socket.  This can happen if close() was called with writes
927   // pending, and then shutdownWriteNow() is called before all pending writes
928   // complete.
929   if (shutdownFlags_ & SHUT_READ) {
930     closeNow();
931     return;
932   }
933
934   DestructorGuard dg(this);
935   assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
936
937   switch (static_cast<StateEnum>(state_)) {
938     case StateEnum::ESTABLISHED:
939     {
940       shutdownFlags_ |= SHUT_WRITE;
941
942       // If the write timeout was set, cancel it.
943       writeTimeout_.cancelTimeout();
944
945       // If we are registered for write events, unregister.
946       if (!updateEventRegistration(0, EventHandler::WRITE)) {
947         // We will have been moved into the error state.
948         assert(state_ == StateEnum::ERROR);
949         return;
950       }
951
952       // Shutdown writes on the file descriptor
953       ::shutdown(fd_, SHUT_WR);
954
955       // Immediately fail all write requests
956       failAllWrites(socketShutdownForWritesEx);
957       return;
958     }
959     case StateEnum::CONNECTING:
960     {
961       // Set the SHUT_WRITE_PENDING flag.
962       // When the connection completes, it will check this flag,
963       // shutdown the write half of the socket, and then set SHUT_WRITE.
964       shutdownFlags_ |= SHUT_WRITE_PENDING;
965
966       // Immediately fail all write requests
967       failAllWrites(socketShutdownForWritesEx);
968       return;
969     }
970     case StateEnum::UNINIT:
971       // Callers normally shouldn't call shutdownWriteNow() before the socket
972       // even starts connecting.  Nonetheless, go ahead and set
973       // SHUT_WRITE_PENDING.  Once the socket eventually connects it will
974       // immediately shut down the write side of the socket.
975       shutdownFlags_ |= SHUT_WRITE_PENDING;
976       return;
977     case StateEnum::CLOSED:
978     case StateEnum::ERROR:
979       // We should never get here.  SHUT_WRITE should always be set
980       // in STATE_CLOSED and STATE_ERROR.
981       VLOG(4) << "AsyncSocket::shutdownWriteNow() (this=" << this
982                  << ", fd=" << fd_ << ") in unexpected state " << state_
983                  << " with SHUT_WRITE not set ("
984                  << std::hex << (int) shutdownFlags_ << ")";
985       assert(false);
986       return;
987   }
988
989   LOG(DFATAL) << "AsyncSocket::shutdownWriteNow() (this=" << this << ", fd="
990               << fd_ << ") called in unknown state " << state_;
991 }
992
993 bool AsyncSocket::readable() const {
994   if (fd_ == -1) {
995     return false;
996   }
997   struct pollfd fds[1];
998   fds[0].fd = fd_;
999   fds[0].events = POLLIN;
1000   fds[0].revents = 0;
1001   int rc = poll(fds, 1, 0);
1002   return rc == 1;
1003 }
1004
1005 bool AsyncSocket::isPending() const {
1006   return ioHandler_.isPending();
1007 }
1008
1009 bool AsyncSocket::hangup() const {
1010   if (fd_ == -1) {
1011     // sanity check, no one should ask for hangup if we are not connected.
1012     assert(false);
1013     return false;
1014   }
1015 #ifdef POLLRDHUP // Linux-only
1016   struct pollfd fds[1];
1017   fds[0].fd = fd_;
1018   fds[0].events = POLLRDHUP|POLLHUP;
1019   fds[0].revents = 0;
1020   poll(fds, 1, 0);
1021   return (fds[0].revents & (POLLRDHUP|POLLHUP)) != 0;
1022 #else
1023   return false;
1024 #endif
1025 }
1026
1027 bool AsyncSocket::good() const {
1028   return ((state_ == StateEnum::CONNECTING ||
1029           state_ == StateEnum::ESTABLISHED) &&
1030           (shutdownFlags_ == 0) && (eventBase_ != nullptr));
1031 }
1032
1033 bool AsyncSocket::error() const {
1034   return (state_ == StateEnum::ERROR);
1035 }
1036
1037 void AsyncSocket::attachEventBase(EventBase* eventBase) {
1038   VLOG(5) << "AsyncSocket::attachEventBase(this=" << this << ", fd=" << fd_
1039           << ", old evb=" << eventBase_ << ", new evb=" << eventBase
1040           << ", state=" << state_ << ", events="
1041           << std::hex << eventFlags_ << ")";
1042   assert(eventBase_ == nullptr);
1043   assert(eventBase->isInEventBaseThread());
1044
1045   eventBase_ = eventBase;
1046   ioHandler_.attachEventBase(eventBase);
1047   writeTimeout_.attachEventBase(eventBase);
1048 }
1049
1050 void AsyncSocket::detachEventBase() {
1051   VLOG(5) << "AsyncSocket::detachEventBase(this=" << this << ", fd=" << fd_
1052           << ", old evb=" << eventBase_ << ", state=" << state_
1053           << ", events=" << std::hex << eventFlags_ << ")";
1054   assert(eventBase_ != nullptr);
1055   assert(eventBase_->isInEventBaseThread());
1056
1057   eventBase_ = nullptr;
1058   ioHandler_.detachEventBase();
1059   writeTimeout_.detachEventBase();
1060 }
1061
1062 bool AsyncSocket::isDetachable() const {
1063   DCHECK(eventBase_ != nullptr);
1064   DCHECK(eventBase_->isInEventBaseThread());
1065
1066   return !ioHandler_.isHandlerRegistered() && !writeTimeout_.isScheduled();
1067 }
1068
1069 void AsyncSocket::getLocalAddress(folly::SocketAddress* address) const {
1070   if (!localAddr_.isInitialized()) {
1071     localAddr_.setFromLocalAddress(fd_);
1072   }
1073   *address = localAddr_;
1074 }
1075
1076 void AsyncSocket::getPeerAddress(folly::SocketAddress* address) const {
1077   if (!addr_.isInitialized()) {
1078     addr_.setFromPeerAddress(fd_);
1079   }
1080   *address = addr_;
1081 }
1082
1083 int AsyncSocket::setNoDelay(bool noDelay) {
1084   if (fd_ < 0) {
1085     VLOG(4) << "AsyncSocket::setNoDelay() called on non-open socket "
1086                << this << "(state=" << state_ << ")";
1087     return EINVAL;
1088
1089   }
1090
1091   int value = noDelay ? 1 : 0;
1092   if (setsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &value, sizeof(value)) != 0) {
1093     int errnoCopy = errno;
1094     VLOG(2) << "failed to update TCP_NODELAY option on AsyncSocket "
1095             << this << " (fd=" << fd_ << ", state=" << state_ << "): "
1096             << strerror(errnoCopy);
1097     return errnoCopy;
1098   }
1099
1100   return 0;
1101 }
1102
1103 int AsyncSocket::setCongestionFlavor(const std::string &cname) {
1104
1105   #ifndef TCP_CONGESTION
1106   #define TCP_CONGESTION  13
1107   #endif
1108
1109   if (fd_ < 0) {
1110     VLOG(4) << "AsyncSocket::setCongestionFlavor() called on non-open "
1111                << "socket " << this << "(state=" << state_ << ")";
1112     return EINVAL;
1113
1114   }
1115
1116   if (setsockopt(fd_, IPPROTO_TCP, TCP_CONGESTION, cname.c_str(),
1117         cname.length() + 1) != 0) {
1118     int errnoCopy = errno;
1119     VLOG(2) << "failed to update TCP_CONGESTION option on AsyncSocket "
1120             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1121             << strerror(errnoCopy);
1122     return errnoCopy;
1123   }
1124
1125   return 0;
1126 }
1127
1128 int AsyncSocket::setQuickAck(bool quickack) {
1129   if (fd_ < 0) {
1130     VLOG(4) << "AsyncSocket::setQuickAck() called on non-open socket "
1131                << this << "(state=" << state_ << ")";
1132     return EINVAL;
1133
1134   }
1135
1136 #ifdef TCP_QUICKACK // Linux-only
1137   int value = quickack ? 1 : 0;
1138   if (setsockopt(fd_, IPPROTO_TCP, TCP_QUICKACK, &value, sizeof(value)) != 0) {
1139     int errnoCopy = errno;
1140     VLOG(2) << "failed to update TCP_QUICKACK option on AsyncSocket"
1141             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1142             << strerror(errnoCopy);
1143     return errnoCopy;
1144   }
1145
1146   return 0;
1147 #else
1148   return ENOSYS;
1149 #endif
1150 }
1151
1152 int AsyncSocket::setSendBufSize(size_t bufsize) {
1153   if (fd_ < 0) {
1154     VLOG(4) << "AsyncSocket::setSendBufSize() called on non-open socket "
1155                << this << "(state=" << state_ << ")";
1156     return EINVAL;
1157   }
1158
1159   if (setsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &bufsize, sizeof(bufsize)) !=0) {
1160     int errnoCopy = errno;
1161     VLOG(2) << "failed to update SO_SNDBUF option on AsyncSocket"
1162             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1163             << strerror(errnoCopy);
1164     return errnoCopy;
1165   }
1166
1167   return 0;
1168 }
1169
1170 int AsyncSocket::setRecvBufSize(size_t bufsize) {
1171   if (fd_ < 0) {
1172     VLOG(4) << "AsyncSocket::setRecvBufSize() called on non-open socket "
1173                << this << "(state=" << state_ << ")";
1174     return EINVAL;
1175   }
1176
1177   if (setsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &bufsize, sizeof(bufsize)) !=0) {
1178     int errnoCopy = errno;
1179     VLOG(2) << "failed to update SO_RCVBUF option on AsyncSocket"
1180             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1181             << strerror(errnoCopy);
1182     return errnoCopy;
1183   }
1184
1185   return 0;
1186 }
1187
1188 int AsyncSocket::setTCPProfile(int profd) {
1189   if (fd_ < 0) {
1190     VLOG(4) << "AsyncSocket::setTCPProfile() called on non-open socket "
1191                << this << "(state=" << state_ << ")";
1192     return EINVAL;
1193   }
1194
1195   if (setsockopt(fd_, SOL_SOCKET, SO_SET_NAMESPACE, &profd, sizeof(int)) !=0) {
1196     int errnoCopy = errno;
1197     VLOG(2) << "failed to set socket namespace option on AsyncSocket"
1198             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1199             << strerror(errnoCopy);
1200     return errnoCopy;
1201   }
1202
1203   return 0;
1204 }
1205
1206 void AsyncSocket::setPersistentCork(bool cork) {
1207   if (setCork(cork) == 0) {
1208     persistentCork_ = cork;
1209   }
1210 }
1211
1212 int AsyncSocket::setCork(bool cork) {
1213 #ifdef TCP_CORK
1214   if (fd_ < 0) {
1215     VLOG(4) << "AsyncSocket::setCork() called on non-open socket "
1216             << this << "(stats=" << state_ << ")";
1217     return EINVAL;
1218   }
1219
1220   if (corked_ == cork) {
1221     return 0;
1222   }
1223
1224   int flag = cork ? 1 : 0;
1225   if (setsockopt(fd_, IPPROTO_TCP, TCP_CORK, &flag, sizeof(flag)) != 0) {
1226     int errnoCopy = errno;
1227     VLOG(2) << "faield to turn on TCP_CORK option on AsyncSocket"
1228             << this << "(fd=" << fd_ << ", state=" << state_ << "):"
1229             << folly::errnoStr(errnoCopy);
1230     return errnoCopy;
1231   }
1232   corked_ = cork;
1233 #endif
1234   return 0;
1235 }
1236
1237 void AsyncSocket::ioReady(uint16_t events) noexcept {
1238   VLOG(7) << "AsyncSocket::ioRead() this=" << this << ", fd" << fd_
1239           << ", events=" << std::hex << events << ", state=" << state_;
1240   DestructorGuard dg(this);
1241   assert(events & EventHandler::READ_WRITE);
1242   assert(eventBase_->isInEventBaseThread());
1243
1244   uint16_t relevantEvents = events & EventHandler::READ_WRITE;
1245   if (relevantEvents == EventHandler::READ) {
1246     handleRead();
1247   } else if (relevantEvents == EventHandler::WRITE) {
1248     handleWrite();
1249   } else if (relevantEvents == EventHandler::READ_WRITE) {
1250     EventBase* originalEventBase = eventBase_;
1251     // If both read and write events are ready, process writes first.
1252     handleWrite();
1253
1254     // Return now if handleWrite() detached us from our EventBase
1255     if (eventBase_ != originalEventBase) {
1256       return;
1257     }
1258
1259     // Only call handleRead() if a read callback is still installed.
1260     // (It's possible that the read callback was uninstalled during
1261     // handleWrite().)
1262     if (readCallback_) {
1263       handleRead();
1264     }
1265   } else {
1266     VLOG(4) << "AsyncSocket::ioRead() called with unexpected events "
1267                << std::hex << events << "(this=" << this << ")";
1268     abort();
1269   }
1270 }
1271
1272 ssize_t AsyncSocket::performRead(void** buf,
1273                                  size_t* buflen,
1274                                  size_t* /* offset */) {
1275   VLOG(5) << "AsyncSocket::performRead() this=" << this
1276           << ", buf=" << *buf << ", buflen=" << *buflen;
1277
1278   int recvFlags = 0;
1279   if (peek_) {
1280     recvFlags |= MSG_PEEK;
1281   }
1282
1283   ssize_t bytes = recv(fd_, *buf, *buflen, MSG_DONTWAIT | recvFlags);
1284   if (bytes < 0) {
1285     if (errno == EAGAIN || errno == EWOULDBLOCK) {
1286       // No more data to read right now.
1287       return READ_BLOCKING;
1288     } else {
1289       return READ_ERROR;
1290     }
1291   } else {
1292     appBytesReceived_ += bytes;
1293     return bytes;
1294   }
1295 }
1296
1297 void AsyncSocket::prepareReadBuffer(void** buf, size_t* buflen) noexcept {
1298   // no matter what, buffer should be preapared for non-ssl socket
1299   CHECK(readCallback_);
1300   readCallback_->getReadBuffer(buf, buflen);
1301 }
1302
1303 void AsyncSocket::handleRead() noexcept {
1304   VLOG(5) << "AsyncSocket::handleRead() this=" << this << ", fd=" << fd_
1305           << ", state=" << state_;
1306   assert(state_ == StateEnum::ESTABLISHED);
1307   assert((shutdownFlags_ & SHUT_READ) == 0);
1308   assert(readCallback_ != nullptr);
1309   assert(eventFlags_ & EventHandler::READ);
1310
1311   // Loop until:
1312   // - a read attempt would block
1313   // - readCallback_ is uninstalled
1314   // - the number of loop iterations exceeds the optional maximum
1315   // - this AsyncSocket is moved to another EventBase
1316   //
1317   // When we invoke readDataAvailable() it may uninstall the readCallback_,
1318   // which is why need to check for it here.
1319   //
1320   // The last bullet point is slightly subtle.  readDataAvailable() may also
1321   // detach this socket from this EventBase.  However, before
1322   // readDataAvailable() returns another thread may pick it up, attach it to
1323   // a different EventBase, and install another readCallback_.  We need to
1324   // exit immediately after readDataAvailable() returns if the eventBase_ has
1325   // changed.  (The caller must perform some sort of locking to transfer the
1326   // AsyncSocket between threads properly.  This will be sufficient to ensure
1327   // that this thread sees the updated eventBase_ variable after
1328   // readDataAvailable() returns.)
1329   uint16_t numReads = 0;
1330   EventBase* originalEventBase = eventBase_;
1331   while (readCallback_ && eventBase_ == originalEventBase) {
1332     // Get the buffer to read into.
1333     void* buf = nullptr;
1334     size_t buflen = 0, offset = 0;
1335     try {
1336       prepareReadBuffer(&buf, &buflen);
1337       VLOG(5) << "prepareReadBuffer() buf=" << buf << ", buflen=" << buflen;
1338     } catch (const AsyncSocketException& ex) {
1339       return failRead(__func__, ex);
1340     } catch (const std::exception& ex) {
1341       AsyncSocketException tex(AsyncSocketException::BAD_ARGS,
1342                               string("ReadCallback::getReadBuffer() "
1343                                      "threw exception: ") +
1344                               ex.what());
1345       return failRead(__func__, tex);
1346     } catch (...) {
1347       AsyncSocketException ex(AsyncSocketException::BAD_ARGS,
1348                              "ReadCallback::getReadBuffer() threw "
1349                              "non-exception type");
1350       return failRead(__func__, ex);
1351     }
1352     if (!isBufferMovable_ && (buf == nullptr || buflen == 0)) {
1353       AsyncSocketException ex(AsyncSocketException::BAD_ARGS,
1354                              "ReadCallback::getReadBuffer() returned "
1355                              "empty buffer");
1356       return failRead(__func__, ex);
1357     }
1358
1359     // Perform the read
1360     ssize_t bytesRead = performRead(&buf, &buflen, &offset);
1361     VLOG(4) << "this=" << this << ", AsyncSocket::handleRead() got "
1362             << bytesRead << " bytes";
1363     if (bytesRead > 0) {
1364       if (!isBufferMovable_) {
1365         readCallback_->readDataAvailable(bytesRead);
1366       } else {
1367         CHECK(kOpenSslModeMoveBufferOwnership);
1368         VLOG(5) << "this=" << this << ", AsyncSocket::handleRead() got "
1369                 << "buf=" << buf << ", " << bytesRead << "/" << buflen
1370                 << ", offset=" << offset;
1371         auto readBuf = folly::IOBuf::takeOwnership(buf, buflen);
1372         readBuf->trimStart(offset);
1373         readBuf->trimEnd(buflen - offset - bytesRead);
1374         readCallback_->readBufferAvailable(std::move(readBuf));
1375       }
1376
1377       // Fall through and continue around the loop if the read
1378       // completely filled the available buffer.
1379       // Note that readCallback_ may have been uninstalled or changed inside
1380       // readDataAvailable().
1381       if (size_t(bytesRead) < buflen) {
1382         return;
1383       }
1384     } else if (bytesRead == READ_BLOCKING) {
1385         // No more data to read right now.
1386         return;
1387     } else if (bytesRead == READ_ERROR) {
1388       readErr_ = READ_ERROR;
1389       AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1390                              withAddr("recv() failed"), errno);
1391       return failRead(__func__, ex);
1392     } else {
1393       assert(bytesRead == READ_EOF);
1394       readErr_ = READ_EOF;
1395       // EOF
1396       shutdownFlags_ |= SHUT_READ;
1397       if (!updateEventRegistration(0, EventHandler::READ)) {
1398         // we've already been moved into STATE_ERROR
1399         assert(state_ == StateEnum::ERROR);
1400         assert(readCallback_ == nullptr);
1401         return;
1402       }
1403
1404       ReadCallback* callback = readCallback_;
1405       readCallback_ = nullptr;
1406       callback->readEOF();
1407       return;
1408     }
1409     if (maxReadsPerEvent_ && (++numReads >= maxReadsPerEvent_)) {
1410       if (readCallback_ != nullptr) {
1411         // We might still have data in the socket.
1412         // (e.g. see comment in AsyncSSLSocket::checkForImmediateRead)
1413         scheduleImmediateRead();
1414       }
1415       return;
1416     }
1417   }
1418 }
1419
1420 /**
1421  * This function attempts to write as much data as possible, until no more data
1422  * can be written.
1423  *
1424  * - If it sends all available data, it unregisters for write events, and stops
1425  *   the writeTimeout_.
1426  *
1427  * - If not all of the data can be sent immediately, it reschedules
1428  *   writeTimeout_ (if a non-zero timeout is set), and ensures the handler is
1429  *   registered for write events.
1430  */
1431 void AsyncSocket::handleWrite() noexcept {
1432   VLOG(5) << "AsyncSocket::handleWrite() this=" << this << ", fd=" << fd_
1433           << ", state=" << state_;
1434   if (state_ == StateEnum::CONNECTING) {
1435     handleConnect();
1436     return;
1437   }
1438
1439   // Normal write
1440   assert(state_ == StateEnum::ESTABLISHED);
1441   assert((shutdownFlags_ & SHUT_WRITE) == 0);
1442   assert(writeReqHead_ != nullptr);
1443
1444   // Loop until we run out of write requests,
1445   // or until this socket is moved to another EventBase.
1446   // (See the comment in handleRead() explaining how this can happen.)
1447   EventBase* originalEventBase = eventBase_;
1448   while (writeReqHead_ != nullptr && eventBase_ == originalEventBase) {
1449     if (!writeReqHead_->performWrite()) {
1450       AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1451                              withAddr("writev() failed"), errno);
1452       return failWrite(__func__, ex);
1453     } else if (writeReqHead_->isComplete()) {
1454       // We finished this request
1455       WriteRequest* req = writeReqHead_;
1456       writeReqHead_ = req->getNext();
1457
1458       if (writeReqHead_ == nullptr) {
1459         writeReqTail_ = nullptr;
1460         // This is the last write request.
1461         // Unregister for write events and cancel the send timer
1462         // before we invoke the callback.  We have to update the state properly
1463         // before calling the callback, since it may want to detach us from
1464         // the EventBase.
1465         if (eventFlags_ & EventHandler::WRITE) {
1466           if (!updateEventRegistration(0, EventHandler::WRITE)) {
1467             assert(state_ == StateEnum::ERROR);
1468             return;
1469           }
1470           // Stop the send timeout
1471           writeTimeout_.cancelTimeout();
1472         }
1473         assert(!writeTimeout_.isScheduled());
1474
1475         // If SHUT_WRITE_PENDING is set, we should shutdown the socket after
1476         // we finish sending the last write request.
1477         //
1478         // We have to do this before invoking writeSuccess(), since
1479         // writeSuccess() may detach us from our EventBase.
1480         if (shutdownFlags_ & SHUT_WRITE_PENDING) {
1481           assert(connectCallback_ == nullptr);
1482           shutdownFlags_ |= SHUT_WRITE;
1483
1484           if (shutdownFlags_ & SHUT_READ) {
1485             // Reads have already been shutdown.  Fully close the socket and
1486             // move to STATE_CLOSED.
1487             //
1488             // Note: This code currently moves us to STATE_CLOSED even if
1489             // close() hasn't ever been called.  This can occur if we have
1490             // received EOF from the peer and shutdownWrite() has been called
1491             // locally.  Should we bother staying in STATE_ESTABLISHED in this
1492             // case, until close() is actually called?  I can't think of a
1493             // reason why we would need to do so.  No other operations besides
1494             // calling close() or destroying the socket can be performed at
1495             // this point.
1496             assert(readCallback_ == nullptr);
1497             state_ = StateEnum::CLOSED;
1498             if (fd_ >= 0) {
1499               ioHandler_.changeHandlerFD(-1);
1500               doClose();
1501             }
1502           } else {
1503             // Reads are still enabled, so we are only doing a half-shutdown
1504             ::shutdown(fd_, SHUT_WR);
1505           }
1506         }
1507       }
1508
1509       // Invoke the callback
1510       WriteCallback* callback = req->getCallback();
1511       req->destroy();
1512       if (callback) {
1513         callback->writeSuccess();
1514       }
1515       // We'll continue around the loop, trying to write another request
1516     } else {
1517       // Partial write.
1518       if (bufferCallback_) {
1519         bufferCallback_->onEgressBuffered();
1520       }
1521       writeReqHead_->consume();
1522       // Stop after a partial write; it's highly likely that a subsequent write
1523       // attempt will just return EAGAIN.
1524       //
1525       // Ensure that we are registered for write events.
1526       if ((eventFlags_ & EventHandler::WRITE) == 0) {
1527         if (!updateEventRegistration(EventHandler::WRITE, 0)) {
1528           assert(state_ == StateEnum::ERROR);
1529           return;
1530         }
1531       }
1532
1533       // Reschedule the send timeout, since we have made some write progress.
1534       if (sendTimeout_ > 0) {
1535         if (!writeTimeout_.scheduleTimeout(sendTimeout_)) {
1536           AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1537               withAddr("failed to reschedule write timeout"));
1538           return failWrite(__func__, ex);
1539         }
1540       }
1541       return;
1542     }
1543   }
1544   if (!writeReqHead_ && bufferCallback_) {
1545     bufferCallback_->onEgressBufferCleared();
1546   }
1547 }
1548
1549 void AsyncSocket::checkForImmediateRead() noexcept {
1550   // We currently don't attempt to perform optimistic reads in AsyncSocket.
1551   // (However, note that some subclasses do override this method.)
1552   //
1553   // Simply calling handleRead() here would be bad, as this would call
1554   // readCallback_->getReadBuffer(), forcing the callback to allocate a read
1555   // buffer even though no data may be available.  This would waste lots of
1556   // memory, since the buffer will sit around unused until the socket actually
1557   // becomes readable.
1558   //
1559   // Checking if the socket is readable now also seems like it would probably
1560   // be a pessimism.  In most cases it probably wouldn't be readable, and we
1561   // would just waste an extra system call.  Even if it is readable, waiting to
1562   // find out from libevent on the next event loop doesn't seem that bad.
1563 }
1564
1565 void AsyncSocket::handleInitialReadWrite() noexcept {
1566   // Our callers should already be holding a DestructorGuard, but grab
1567   // one here just to make sure, in case one of our calling code paths ever
1568   // changes.
1569   DestructorGuard dg(this);
1570
1571   // If we have a readCallback_, make sure we enable read events.  We
1572   // may already be registered for reads if connectSuccess() set
1573   // the read calback.
1574   if (readCallback_ && !(eventFlags_ & EventHandler::READ)) {
1575     assert(state_ == StateEnum::ESTABLISHED);
1576     assert((shutdownFlags_ & SHUT_READ) == 0);
1577     if (!updateEventRegistration(EventHandler::READ, 0)) {
1578       assert(state_ == StateEnum::ERROR);
1579       return;
1580     }
1581     checkForImmediateRead();
1582   } else if (readCallback_ == nullptr) {
1583     // Unregister for read events.
1584     updateEventRegistration(0, EventHandler::READ);
1585   }
1586
1587   // If we have write requests pending, try to send them immediately.
1588   // Since we just finished accepting, there is a very good chance that we can
1589   // write without blocking.
1590   //
1591   // However, we only process them if EventHandler::WRITE is not already set,
1592   // which means that we're already blocked on a write attempt.  (This can
1593   // happen if connectSuccess() called write() before returning.)
1594   if (writeReqHead_ && !(eventFlags_ & EventHandler::WRITE)) {
1595     // Call handleWrite() to perform write processing.
1596     handleWrite();
1597   } else if (writeReqHead_ == nullptr) {
1598     // Unregister for write event.
1599     updateEventRegistration(0, EventHandler::WRITE);
1600   }
1601 }
1602
1603 void AsyncSocket::handleConnect() noexcept {
1604   VLOG(5) << "AsyncSocket::handleConnect() this=" << this << ", fd=" << fd_
1605           << ", state=" << state_;
1606   assert(state_ == StateEnum::CONNECTING);
1607   // SHUT_WRITE can never be set while we are still connecting;
1608   // SHUT_WRITE_PENDING may be set, be we only set SHUT_WRITE once the connect
1609   // finishes
1610   assert((shutdownFlags_ & SHUT_WRITE) == 0);
1611
1612   // In case we had a connect timeout, cancel the timeout
1613   writeTimeout_.cancelTimeout();
1614   // We don't use a persistent registration when waiting on a connect event,
1615   // so we have been automatically unregistered now.  Update eventFlags_ to
1616   // reflect reality.
1617   assert(eventFlags_ == EventHandler::WRITE);
1618   eventFlags_ = EventHandler::NONE;
1619
1620   // Call getsockopt() to check if the connect succeeded
1621   int error;
1622   socklen_t len = sizeof(error);
1623   int rv = getsockopt(fd_, SOL_SOCKET, SO_ERROR, &error, &len);
1624   if (rv != 0) {
1625     AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1626                            withAddr("error calling getsockopt() after connect"),
1627                            errno);
1628     VLOG(4) << "AsyncSocket::handleConnect(this=" << this << ", fd="
1629                << fd_ << " host=" << addr_.describe()
1630                << ") exception:" << ex.what();
1631     return failConnect(__func__, ex);
1632   }
1633
1634   if (error != 0) {
1635     AsyncSocketException ex(AsyncSocketException::NOT_OPEN,
1636                            "connect failed", error);
1637     VLOG(1) << "AsyncSocket::handleConnect(this=" << this << ", fd="
1638             << fd_ << " host=" << addr_.describe()
1639             << ") exception: " << ex.what();
1640     return failConnect(__func__, ex);
1641   }
1642
1643   // Move into STATE_ESTABLISHED
1644   state_ = StateEnum::ESTABLISHED;
1645
1646   // If SHUT_WRITE_PENDING is set and we don't have any write requests to
1647   // perform, immediately shutdown the write half of the socket.
1648   if ((shutdownFlags_ & SHUT_WRITE_PENDING) && writeReqHead_ == nullptr) {
1649     // SHUT_READ shouldn't be set.  If close() is called on the socket while we
1650     // are still connecting we just abort the connect rather than waiting for
1651     // it to complete.
1652     assert((shutdownFlags_ & SHUT_READ) == 0);
1653     ::shutdown(fd_, SHUT_WR);
1654     shutdownFlags_ |= SHUT_WRITE;
1655   }
1656
1657   VLOG(7) << "AsyncSocket " << this << ": fd " << fd_
1658           << "successfully connected; state=" << state_;
1659
1660   // Remember the EventBase we are attached to, before we start invoking any
1661   // callbacks (since the callbacks may call detachEventBase()).
1662   EventBase* originalEventBase = eventBase_;
1663
1664   invokeConnectSuccess();
1665   // Note that the connect callback may have changed our state.
1666   // (set or unset the read callback, called write(), closed the socket, etc.)
1667   // The following code needs to handle these situations correctly.
1668   //
1669   // If the socket has been closed, readCallback_ and writeReqHead_ will
1670   // always be nullptr, so that will prevent us from trying to read or write.
1671   //
1672   // The main thing to check for is if eventBase_ is still originalEventBase.
1673   // If not, we have been detached from this event base, so we shouldn't
1674   // perform any more operations.
1675   if (eventBase_ != originalEventBase) {
1676     return;
1677   }
1678
1679   handleInitialReadWrite();
1680 }
1681
1682 void AsyncSocket::timeoutExpired() noexcept {
1683   VLOG(7) << "AsyncSocket " << this << ", fd " << fd_ << ": timeout expired: "
1684           << "state=" << state_ << ", events=" << std::hex << eventFlags_;
1685   DestructorGuard dg(this);
1686   assert(eventBase_->isInEventBaseThread());
1687
1688   if (state_ == StateEnum::CONNECTING) {
1689     // connect() timed out
1690     // Unregister for I/O events.
1691     AsyncSocketException ex(AsyncSocketException::TIMED_OUT,
1692                            "connect timed out");
1693     failConnect(__func__, ex);
1694   } else {
1695     // a normal write operation timed out
1696     assert(state_ == StateEnum::ESTABLISHED);
1697     AsyncSocketException ex(AsyncSocketException::TIMED_OUT, "write timed out");
1698     failWrite(__func__, ex);
1699   }
1700 }
1701
1702 ssize_t AsyncSocket::performWrite(const iovec* vec,
1703                                    uint32_t count,
1704                                    WriteFlags flags,
1705                                    uint32_t* countWritten,
1706                                    uint32_t* partialWritten) {
1707   // We use sendmsg() instead of writev() so that we can pass in MSG_NOSIGNAL
1708   // We correctly handle EPIPE errors, so we never want to receive SIGPIPE
1709   // (since it may terminate the program if the main program doesn't explicitly
1710   // ignore it).
1711   struct msghdr msg;
1712   msg.msg_name = nullptr;
1713   msg.msg_namelen = 0;
1714   msg.msg_iov = const_cast<iovec *>(vec);
1715   msg.msg_iovlen = std::min<size_t>(count, kIovMax);
1716   msg.msg_control = nullptr;
1717   msg.msg_controllen = 0;
1718   msg.msg_flags = 0;
1719
1720   int msg_flags = MSG_DONTWAIT;
1721
1722 #ifdef MSG_NOSIGNAL // Linux-only
1723   msg_flags |= MSG_NOSIGNAL;
1724   if (isSet(flags, WriteFlags::CORK)) {
1725     // MSG_MORE tells the kernel we have more data to send, so wait for us to
1726     // give it the rest of the data rather than immediately sending a partial
1727     // frame, even when TCP_NODELAY is enabled.
1728     msg_flags |= MSG_MORE;
1729   }
1730 #endif
1731   if (isSet(flags, WriteFlags::EOR)) {
1732     // marks that this is the last byte of a record (response)
1733     msg_flags |= MSG_EOR;
1734   }
1735   ssize_t totalWritten = ::sendmsg(fd_, &msg, msg_flags);
1736   if (totalWritten < 0) {
1737     if (errno == EAGAIN) {
1738       // TCP buffer is full; we can't write any more data right now.
1739       *countWritten = 0;
1740       *partialWritten = 0;
1741       return 0;
1742     }
1743     // error
1744     *countWritten = 0;
1745     *partialWritten = 0;
1746     return -1;
1747   }
1748
1749   appBytesWritten_ += totalWritten;
1750
1751   uint32_t bytesWritten;
1752   uint32_t n;
1753   for (bytesWritten = totalWritten, n = 0; n < count; ++n) {
1754     const iovec* v = vec + n;
1755     if (v->iov_len > bytesWritten) {
1756       // Partial write finished in the middle of this iovec
1757       *countWritten = n;
1758       *partialWritten = bytesWritten;
1759       return totalWritten;
1760     }
1761
1762     bytesWritten -= v->iov_len;
1763   }
1764
1765   assert(bytesWritten == 0);
1766   *countWritten = n;
1767   *partialWritten = 0;
1768   return totalWritten;
1769 }
1770
1771 /**
1772  * Re-register the EventHandler after eventFlags_ has changed.
1773  *
1774  * If an error occurs, fail() is called to move the socket into the error state
1775  * and call all currently installed callbacks.  After an error, the
1776  * AsyncSocket is completely unregistered.
1777  *
1778  * @return Returns true on succcess, or false on error.
1779  */
1780 bool AsyncSocket::updateEventRegistration() {
1781   VLOG(5) << "AsyncSocket::updateEventRegistration(this=" << this
1782           << ", fd=" << fd_ << ", evb=" << eventBase_ << ", state=" << state_
1783           << ", events=" << std::hex << eventFlags_;
1784   assert(eventBase_->isInEventBaseThread());
1785   if (eventFlags_ == EventHandler::NONE) {
1786     ioHandler_.unregisterHandler();
1787     return true;
1788   }
1789
1790   // Always register for persistent events, so we don't have to re-register
1791   // after being called back.
1792   if (!ioHandler_.registerHandler(eventFlags_ | EventHandler::PERSIST)) {
1793     eventFlags_ = EventHandler::NONE; // we're not registered after error
1794     AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1795         withAddr("failed to update AsyncSocket event registration"));
1796     fail("updateEventRegistration", ex);
1797     return false;
1798   }
1799
1800   return true;
1801 }
1802
1803 bool AsyncSocket::updateEventRegistration(uint16_t enable,
1804                                            uint16_t disable) {
1805   uint16_t oldFlags = eventFlags_;
1806   eventFlags_ |= enable;
1807   eventFlags_ &= ~disable;
1808   if (eventFlags_ == oldFlags) {
1809     return true;
1810   } else {
1811     return updateEventRegistration();
1812   }
1813 }
1814
1815 void AsyncSocket::startFail() {
1816   // startFail() should only be called once
1817   assert(state_ != StateEnum::ERROR);
1818   assert(getDestructorGuardCount() > 0);
1819   state_ = StateEnum::ERROR;
1820   // Ensure that SHUT_READ and SHUT_WRITE are set,
1821   // so all future attempts to read or write will be rejected
1822   shutdownFlags_ |= (SHUT_READ | SHUT_WRITE);
1823
1824   if (eventFlags_ != EventHandler::NONE) {
1825     eventFlags_ = EventHandler::NONE;
1826     ioHandler_.unregisterHandler();
1827   }
1828   writeTimeout_.cancelTimeout();
1829
1830   if (fd_ >= 0) {
1831     ioHandler_.changeHandlerFD(-1);
1832     doClose();
1833   }
1834 }
1835
1836 void AsyncSocket::finishFail() {
1837   assert(state_ == StateEnum::ERROR);
1838   assert(getDestructorGuardCount() > 0);
1839
1840   AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1841                          withAddr("socket closing after error"));
1842   invokeConnectErr(ex);
1843   failAllWrites(ex);
1844
1845   if (readCallback_) {
1846     ReadCallback* callback = readCallback_;
1847     readCallback_ = nullptr;
1848     callback->readErr(ex);
1849   }
1850 }
1851
1852 void AsyncSocket::fail(const char* fn, const AsyncSocketException& ex) {
1853   VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
1854              << state_ << " host=" << addr_.describe()
1855              << "): failed in " << fn << "(): "
1856              << ex.what();
1857   startFail();
1858   finishFail();
1859 }
1860
1861 void AsyncSocket::failConnect(const char* fn, const AsyncSocketException& ex) {
1862   VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
1863                << state_ << " host=" << addr_.describe()
1864                << "): failed while connecting in " << fn << "(): "
1865                << ex.what();
1866   startFail();
1867
1868   invokeConnectErr(ex);
1869   finishFail();
1870 }
1871
1872 void AsyncSocket::failRead(const char* fn, const AsyncSocketException& ex) {
1873   VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
1874                << state_ << " host=" << addr_.describe()
1875                << "): failed while reading in " << fn << "(): "
1876                << ex.what();
1877   startFail();
1878
1879   if (readCallback_ != nullptr) {
1880     ReadCallback* callback = readCallback_;
1881     readCallback_ = nullptr;
1882     callback->readErr(ex);
1883   }
1884
1885   finishFail();
1886 }
1887
1888 void AsyncSocket::failWrite(const char* fn, const AsyncSocketException& ex) {
1889   VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
1890                << state_ << " host=" << addr_.describe()
1891                << "): failed while writing in " << fn << "(): "
1892                << ex.what();
1893   startFail();
1894
1895   // Only invoke the first write callback, since the error occurred while
1896   // writing this request.  Let any other pending write callbacks be invoked in
1897   // finishFail().
1898   if (writeReqHead_ != nullptr) {
1899     WriteRequest* req = writeReqHead_;
1900     writeReqHead_ = req->getNext();
1901     WriteCallback* callback = req->getCallback();
1902     uint32_t bytesWritten = req->getTotalBytesWritten();
1903     req->destroy();
1904     if (callback) {
1905       callback->writeErr(bytesWritten, ex);
1906     }
1907   }
1908
1909   finishFail();
1910 }
1911
1912 void AsyncSocket::failWrite(const char* fn, WriteCallback* callback,
1913                              size_t bytesWritten,
1914                              const AsyncSocketException& ex) {
1915   // This version of failWrite() is used when the failure occurs before
1916   // we've added the callback to writeReqHead_.
1917   VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
1918              << state_ << " host=" << addr_.describe()
1919              <<"): failed while writing in " << fn << "(): "
1920              << ex.what();
1921   startFail();
1922
1923   if (callback != nullptr) {
1924     callback->writeErr(bytesWritten, ex);
1925   }
1926
1927   finishFail();
1928 }
1929
1930 void AsyncSocket::failAllWrites(const AsyncSocketException& ex) {
1931   // Invoke writeError() on all write callbacks.
1932   // This is used when writes are forcibly shutdown with write requests
1933   // pending, or when an error occurs with writes pending.
1934   while (writeReqHead_ != nullptr) {
1935     WriteRequest* req = writeReqHead_;
1936     writeReqHead_ = req->getNext();
1937     WriteCallback* callback = req->getCallback();
1938     if (callback) {
1939       callback->writeErr(req->getTotalBytesWritten(), ex);
1940     }
1941     req->destroy();
1942   }
1943 }
1944
1945 void AsyncSocket::invalidState(ConnectCallback* callback) {
1946   VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_
1947              << "): connect() called in invalid state " << state_;
1948
1949   /*
1950    * The invalidState() methods don't use the normal failure mechanisms,
1951    * since we don't know what state we are in.  We don't want to call
1952    * startFail()/finishFail() recursively if we are already in the middle of
1953    * cleaning up.
1954    */
1955
1956   AsyncSocketException ex(AsyncSocketException::ALREADY_OPEN,
1957                          "connect() called with socket in invalid state");
1958   connectEndTime_ = std::chrono::steady_clock::now();
1959   if (state_ == StateEnum::CLOSED || state_ == StateEnum::ERROR) {
1960     if (callback) {
1961       callback->connectErr(ex);
1962     }
1963   } else {
1964     // We can't use failConnect() here since connectCallback_
1965     // may already be set to another callback.  Invoke this ConnectCallback
1966     // here; any other connectCallback_ will be invoked in finishFail()
1967     startFail();
1968     if (callback) {
1969       callback->connectErr(ex);
1970     }
1971     finishFail();
1972   }
1973 }
1974
1975 void AsyncSocket::invokeConnectErr(const AsyncSocketException& ex) {
1976   connectEndTime_ = std::chrono::steady_clock::now();
1977   if (connectCallback_) {
1978     ConnectCallback* callback = connectCallback_;
1979     connectCallback_ = nullptr;
1980     callback->connectErr(ex);
1981   }
1982 }
1983
1984 void AsyncSocket::invokeConnectSuccess() {
1985   connectEndTime_ = std::chrono::steady_clock::now();
1986   if (connectCallback_) {
1987     ConnectCallback* callback = connectCallback_;
1988     connectCallback_ = nullptr;
1989     callback->connectSuccess();
1990   }
1991 }
1992
1993 void AsyncSocket::invalidState(ReadCallback* callback) {
1994   VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_
1995              << "): setReadCallback(" << callback
1996              << ") called in invalid state " << state_;
1997
1998   AsyncSocketException ex(AsyncSocketException::NOT_OPEN,
1999                          "setReadCallback() called with socket in "
2000                          "invalid state");
2001   if (state_ == StateEnum::CLOSED || state_ == StateEnum::ERROR) {
2002     if (callback) {
2003       callback->readErr(ex);
2004     }
2005   } else {
2006     startFail();
2007     if (callback) {
2008       callback->readErr(ex);
2009     }
2010     finishFail();
2011   }
2012 }
2013
2014 void AsyncSocket::invalidState(WriteCallback* callback) {
2015   VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_
2016              << "): write() called in invalid state " << state_;
2017
2018   AsyncSocketException ex(AsyncSocketException::NOT_OPEN,
2019                          withAddr("write() called with socket in invalid state"));
2020   if (state_ == StateEnum::CLOSED || state_ == StateEnum::ERROR) {
2021     if (callback) {
2022       callback->writeErr(0, ex);
2023     }
2024   } else {
2025     startFail();
2026     if (callback) {
2027       callback->writeErr(0, ex);
2028     }
2029     finishFail();
2030   }
2031 }
2032
2033 void AsyncSocket::doClose() {
2034   if (fd_ == -1) return;
2035   if (shutdownSocketSet_) {
2036     shutdownSocketSet_->close(fd_);
2037   } else {
2038     ::close(fd_);
2039   }
2040   fd_ = -1;
2041 }
2042
2043 std::ostream& operator << (std::ostream& os,
2044                            const AsyncSocket::StateEnum& state) {
2045   os << static_cast<int>(state);
2046   return os;
2047 }
2048
2049 std::string AsyncSocket::withAddr(const std::string& s) {
2050   // Don't use addr_ directly because it may not be initialized
2051   // e.g. if constructed from fd
2052   folly::SocketAddress peer, local;
2053   try {
2054     getPeerAddress(&peer);
2055     getLocalAddress(&local);
2056   } catch (const std::exception&) {
2057     // ignore
2058   } catch (...) {
2059     // ignore
2060   }
2061   return s + " (peer=" + peer.describe() + ", local=" + local.describe() + ")";
2062 }
2063
2064 void AsyncSocket::setBufferCallback(BufferCallback* cb) {
2065   bufferCallback_ = cb;
2066 }
2067
2068 } // folly