Add SO_ZEROCOPY support
[folly.git] / folly / io / async / AsyncSocket.cpp
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/io/async/AsyncSocket.h>
18
19 #include <folly/ExceptionWrapper.h>
20 #include <folly/Portability.h>
21 #include <folly/SocketAddress.h>
22 #include <folly/io/Cursor.h>
23 #include <folly/io/IOBuf.h>
24 #include <folly/io/IOBufQueue.h>
25 #include <folly/portability/Fcntl.h>
26 #include <folly/portability/Sockets.h>
27 #include <folly/portability/SysUio.h>
28 #include <folly/portability/Unistd.h>
29
30 #include <boost/preprocessor/control/if.hpp>
31 #include <errno.h>
32 #include <limits.h>
33 #include <sys/types.h>
34 #include <thread>
35
36 using std::string;
37 using std::unique_ptr;
38
39 namespace fsp = folly::portability::sockets;
40
41 namespace folly {
42
43 static constexpr bool msgErrQueueSupported =
44 #ifdef MSG_ERRQUEUE
45     true;
46 #else
47     false;
48 #endif // MSG_ERRQUEUE
49
50 // static members initializers
51 const AsyncSocket::OptionMap AsyncSocket::emptyOptionMap;
52
53 const AsyncSocketException socketClosedLocallyEx(
54     AsyncSocketException::END_OF_FILE, "socket closed locally");
55 const AsyncSocketException socketShutdownForWritesEx(
56     AsyncSocketException::END_OF_FILE, "socket shutdown for writes");
57
58 // TODO: It might help performance to provide a version of BytesWriteRequest that
59 // users could derive from, so we can avoid the extra allocation for each call
60 // to write()/writev().  We could templatize TFramedAsyncChannel just like the
61 // protocols are currently templatized for transports.
62 //
63 // We would need the version for external users where they provide the iovec
64 // storage space, and only our internal version would allocate it at the end of
65 // the WriteRequest.
66
67 /* The default WriteRequest implementation, used for write(), writev() and
68  * writeChain()
69  *
70  * A new BytesWriteRequest operation is allocated on the heap for all write
71  * operations that cannot be completed immediately.
72  */
73 class AsyncSocket::BytesWriteRequest : public AsyncSocket::WriteRequest {
74  public:
75   static BytesWriteRequest* newRequest(AsyncSocket* socket,
76                                        WriteCallback* callback,
77                                        const iovec* ops,
78                                        uint32_t opCount,
79                                        uint32_t partialWritten,
80                                        uint32_t bytesWritten,
81                                        unique_ptr<IOBuf>&& ioBuf,
82                                        WriteFlags flags) {
83     assert(opCount > 0);
84     // Since we put a variable size iovec array at the end
85     // of each BytesWriteRequest, we have to manually allocate the memory.
86     void* buf = malloc(sizeof(BytesWriteRequest) +
87                        (opCount * sizeof(struct iovec)));
88     if (buf == nullptr) {
89       throw std::bad_alloc();
90     }
91
92     return new(buf) BytesWriteRequest(socket, callback, ops, opCount,
93                                       partialWritten, bytesWritten,
94                                       std::move(ioBuf), flags);
95   }
96
97   void destroy() override {
98     this->~BytesWriteRequest();
99     free(this);
100   }
101
102   WriteResult performWrite() override {
103     WriteFlags writeFlags = flags_;
104     if (getNext() != nullptr) {
105       writeFlags |= WriteFlags::CORK;
106     }
107
108     socket_->adjustZeroCopyFlags(getOps(), getOpCount(), writeFlags);
109
110     auto writeResult = socket_->performWrite(
111         getOps(), getOpCount(), writeFlags, &opsWritten_, &partialBytes_);
112     bytesWritten_ = writeResult.writeReturn > 0 ? writeResult.writeReturn : 0;
113     if (bytesWritten_) {
114       if (socket_->isZeroCopyRequest(writeFlags)) {
115         if (isComplete()) {
116           socket_->addZeroCopyBuff(std::move(ioBuf_));
117         } else {
118           socket_->addZeroCopyBuff(ioBuf_.get());
119         }
120       } else {
121         // this happens if at least one of the prev requests were sent
122         // with zero copy but not the last one
123         if (isComplete() && socket_->getZeroCopy() &&
124             socket_->containsZeroCopyBuff(ioBuf_.get())) {
125           socket_->setZeroCopyBuff(std::move(ioBuf_));
126         }
127       }
128     }
129     return writeResult;
130   }
131
132   bool isComplete() override {
133     return opsWritten_ == getOpCount();
134   }
135
136   void consume() override {
137     // Advance opIndex_ forward by opsWritten_
138     opIndex_ += opsWritten_;
139     assert(opIndex_ < opCount_);
140
141     if (!socket_->isZeroCopyRequest(flags_)) {
142       // If we've finished writing any IOBufs, release them
143       if (ioBuf_) {
144         for (uint32_t i = opsWritten_; i != 0; --i) {
145           assert(ioBuf_);
146           ioBuf_ = ioBuf_->pop();
147         }
148       }
149     }
150
151     // Move partialBytes_ forward into the current iovec buffer
152     struct iovec* currentOp = writeOps_ + opIndex_;
153     assert((partialBytes_ < currentOp->iov_len) || (currentOp->iov_len == 0));
154     currentOp->iov_base =
155       reinterpret_cast<uint8_t*>(currentOp->iov_base) + partialBytes_;
156     currentOp->iov_len -= partialBytes_;
157
158     // Increment the totalBytesWritten_ count by bytesWritten_;
159     assert(bytesWritten_ >= 0);
160     totalBytesWritten_ += uint32_t(bytesWritten_);
161   }
162
163  private:
164   BytesWriteRequest(AsyncSocket* socket,
165                     WriteCallback* callback,
166                     const struct iovec* ops,
167                     uint32_t opCount,
168                     uint32_t partialBytes,
169                     uint32_t bytesWritten,
170                     unique_ptr<IOBuf>&& ioBuf,
171                     WriteFlags flags)
172     : AsyncSocket::WriteRequest(socket, callback)
173     , opCount_(opCount)
174     , opIndex_(0)
175     , flags_(flags)
176     , ioBuf_(std::move(ioBuf))
177     , opsWritten_(0)
178     , partialBytes_(partialBytes)
179     , bytesWritten_(bytesWritten) {
180     memcpy(writeOps_, ops, sizeof(*ops) * opCount_);
181   }
182
183   // private destructor, to ensure callers use destroy()
184   ~BytesWriteRequest() override = default;
185
186   const struct iovec* getOps() const {
187     assert(opCount_ > opIndex_);
188     return writeOps_ + opIndex_;
189   }
190
191   uint32_t getOpCount() const {
192     assert(opCount_ > opIndex_);
193     return opCount_ - opIndex_;
194   }
195
196   uint32_t opCount_;            ///< number of entries in writeOps_
197   uint32_t opIndex_;            ///< current index into writeOps_
198   WriteFlags flags_;            ///< set for WriteFlags
199   unique_ptr<IOBuf> ioBuf_;     ///< underlying IOBuf, or nullptr if N/A
200
201   // for consume(), how much we wrote on the last write
202   uint32_t opsWritten_;         ///< complete ops written
203   uint32_t partialBytes_;       ///< partial bytes of incomplete op written
204   ssize_t bytesWritten_;        ///< bytes written altogether
205
206   struct iovec writeOps_[];     ///< write operation(s) list
207 };
208
209 int AsyncSocket::SendMsgParamsCallback::getDefaultFlags(
210     folly::WriteFlags flags,
211     bool zeroCopyEnabled) noexcept {
212   int msg_flags = MSG_DONTWAIT;
213
214 #ifdef MSG_NOSIGNAL // Linux-only
215   msg_flags |= MSG_NOSIGNAL;
216 #ifdef MSG_MORE
217   if (isSet(flags, WriteFlags::CORK)) {
218     // MSG_MORE tells the kernel we have more data to send, so wait for us to
219     // give it the rest of the data rather than immediately sending a partial
220     // frame, even when TCP_NODELAY is enabled.
221     msg_flags |= MSG_MORE;
222   }
223 #endif // MSG_MORE
224 #endif // MSG_NOSIGNAL
225   if (isSet(flags, WriteFlags::EOR)) {
226     // marks that this is the last byte of a record (response)
227     msg_flags |= MSG_EOR;
228   }
229
230   if (zeroCopyEnabled && isSet(flags, WriteFlags::WRITE_MSG_ZEROCOPY)) {
231     msg_flags |= MSG_ZEROCOPY;
232   }
233
234   return msg_flags;
235 }
236
237 namespace {
238 static AsyncSocket::SendMsgParamsCallback defaultSendMsgParamsCallback;
239 }
240
241 AsyncSocket::AsyncSocket()
242     : eventBase_(nullptr),
243       writeTimeout_(this, nullptr),
244       ioHandler_(this, nullptr),
245       immediateReadHandler_(this) {
246   VLOG(5) << "new AsyncSocket()";
247   init();
248 }
249
250 AsyncSocket::AsyncSocket(EventBase* evb)
251     : eventBase_(evb),
252       writeTimeout_(this, evb),
253       ioHandler_(this, evb),
254       immediateReadHandler_(this) {
255   VLOG(5) << "new AsyncSocket(" << this << ", evb=" << evb << ")";
256   init();
257 }
258
259 AsyncSocket::AsyncSocket(EventBase* evb,
260                            const folly::SocketAddress& address,
261                            uint32_t connectTimeout)
262   : AsyncSocket(evb) {
263   connect(nullptr, address, connectTimeout);
264 }
265
266 AsyncSocket::AsyncSocket(EventBase* evb,
267                            const std::string& ip,
268                            uint16_t port,
269                            uint32_t connectTimeout)
270   : AsyncSocket(evb) {
271   connect(nullptr, ip, port, connectTimeout);
272 }
273
274 AsyncSocket::AsyncSocket(EventBase* evb, int fd)
275     : eventBase_(evb),
276       writeTimeout_(this, evb),
277       ioHandler_(this, evb, fd),
278       immediateReadHandler_(this) {
279   VLOG(5) << "new AsyncSocket(" << this << ", evb=" << evb << ", fd="
280           << fd << ")";
281   init();
282   fd_ = fd;
283   setCloseOnExec();
284   state_ = StateEnum::ESTABLISHED;
285 }
286
287 AsyncSocket::AsyncSocket(AsyncSocket::UniquePtr oldAsyncSocket)
288     : AsyncSocket(oldAsyncSocket->getEventBase(), oldAsyncSocket->detachFd()) {
289   preReceivedData_ = std::move(oldAsyncSocket->preReceivedData_);
290 }
291
292 // init() method, since constructor forwarding isn't supported in most
293 // compilers yet.
294 void AsyncSocket::init() {
295   if (eventBase_) {
296     eventBase_->dcheckIsInEventBaseThread();
297   }
298   shutdownFlags_ = 0;
299   state_ = StateEnum::UNINIT;
300   eventFlags_ = EventHandler::NONE;
301   fd_ = -1;
302   sendTimeout_ = 0;
303   maxReadsPerEvent_ = 16;
304   connectCallback_ = nullptr;
305   errMessageCallback_ = nullptr;
306   readCallback_ = nullptr;
307   writeReqHead_ = nullptr;
308   writeReqTail_ = nullptr;
309   shutdownSocketSet_ = nullptr;
310   appBytesWritten_ = 0;
311   appBytesReceived_ = 0;
312   sendMsgParamCallback_ = &defaultSendMsgParamsCallback;
313 }
314
315 AsyncSocket::~AsyncSocket() {
316   VLOG(7) << "actual destruction of AsyncSocket(this=" << this
317           << ", evb=" << eventBase_ << ", fd=" << fd_
318           << ", state=" << state_ << ")";
319 }
320
321 void AsyncSocket::destroy() {
322   VLOG(5) << "AsyncSocket::destroy(this=" << this << ", evb=" << eventBase_
323           << ", fd=" << fd_ << ", state=" << state_;
324   // When destroy is called, close the socket immediately
325   closeNow();
326
327   // Then call DelayedDestruction::destroy() to take care of
328   // whether or not we need immediate or delayed destruction
329   DelayedDestruction::destroy();
330 }
331
332 int AsyncSocket::detachFd() {
333   VLOG(6) << "AsyncSocket::detachFd(this=" << this << ", fd=" << fd_
334           << ", evb=" << eventBase_ << ", state=" << state_
335           << ", events=" << std::hex << eventFlags_ << ")";
336   // Extract the fd, and set fd_ to -1 first, so closeNow() won't
337   // actually close the descriptor.
338   if (shutdownSocketSet_) {
339     shutdownSocketSet_->remove(fd_);
340   }
341   int fd = fd_;
342   fd_ = -1;
343   // Call closeNow() to invoke all pending callbacks with an error.
344   closeNow();
345   // Update the EventHandler to stop using this fd.
346   // This can only be done after closeNow() unregisters the handler.
347   ioHandler_.changeHandlerFD(-1);
348   return fd;
349 }
350
351 const folly::SocketAddress& AsyncSocket::anyAddress() {
352   static const folly::SocketAddress anyAddress =
353     folly::SocketAddress("0.0.0.0", 0);
354   return anyAddress;
355 }
356
357 void AsyncSocket::setShutdownSocketSet(ShutdownSocketSet* newSS) {
358   if (shutdownSocketSet_ == newSS) {
359     return;
360   }
361   if (shutdownSocketSet_ && fd_ != -1) {
362     shutdownSocketSet_->remove(fd_);
363   }
364   shutdownSocketSet_ = newSS;
365   if (shutdownSocketSet_ && fd_ != -1) {
366     shutdownSocketSet_->add(fd_);
367   }
368 }
369
370 void AsyncSocket::setCloseOnExec() {
371   int rv = fcntl(fd_, F_SETFD, FD_CLOEXEC);
372   if (rv != 0) {
373     auto errnoCopy = errno;
374     throw AsyncSocketException(
375         AsyncSocketException::INTERNAL_ERROR,
376         withAddr("failed to set close-on-exec flag"),
377         errnoCopy);
378   }
379 }
380
381 void AsyncSocket::connect(ConnectCallback* callback,
382                            const folly::SocketAddress& address,
383                            int timeout,
384                            const OptionMap &options,
385                            const folly::SocketAddress& bindAddr) noexcept {
386   DestructorGuard dg(this);
387   eventBase_->dcheckIsInEventBaseThread();
388
389   addr_ = address;
390
391   // Make sure we're in the uninitialized state
392   if (state_ != StateEnum::UNINIT) {
393     return invalidState(callback);
394   }
395
396   connectTimeout_ = std::chrono::milliseconds(timeout);
397   connectStartTime_ = std::chrono::steady_clock::now();
398   // Make connect end time at least >= connectStartTime.
399   connectEndTime_ = connectStartTime_;
400
401   assert(fd_ == -1);
402   state_ = StateEnum::CONNECTING;
403   connectCallback_ = callback;
404
405   sockaddr_storage addrStorage;
406   sockaddr* saddr = reinterpret_cast<sockaddr*>(&addrStorage);
407
408   try {
409     // Create the socket
410     // Technically the first parameter should actually be a protocol family
411     // constant (PF_xxx) rather than an address family (AF_xxx), but the
412     // distinction is mainly just historical.  In pretty much all
413     // implementations the PF_foo and AF_foo constants are identical.
414     fd_ = fsp::socket(address.getFamily(), SOCK_STREAM, 0);
415     if (fd_ < 0) {
416       auto errnoCopy = errno;
417       throw AsyncSocketException(
418           AsyncSocketException::INTERNAL_ERROR,
419           withAddr("failed to create socket"),
420           errnoCopy);
421     }
422     if (shutdownSocketSet_) {
423       shutdownSocketSet_->add(fd_);
424     }
425     ioHandler_.changeHandlerFD(fd_);
426
427     setCloseOnExec();
428
429     // Put the socket in non-blocking mode
430     int flags = fcntl(fd_, F_GETFL, 0);
431     if (flags == -1) {
432       auto errnoCopy = errno;
433       throw AsyncSocketException(
434           AsyncSocketException::INTERNAL_ERROR,
435           withAddr("failed to get socket flags"),
436           errnoCopy);
437     }
438     int rv = fcntl(fd_, F_SETFL, flags | O_NONBLOCK);
439     if (rv == -1) {
440       auto errnoCopy = errno;
441       throw AsyncSocketException(
442           AsyncSocketException::INTERNAL_ERROR,
443           withAddr("failed to put socket in non-blocking mode"),
444           errnoCopy);
445     }
446
447 #if !defined(MSG_NOSIGNAL) && defined(F_SETNOSIGPIPE)
448     // iOS and OS X don't support MSG_NOSIGNAL; set F_SETNOSIGPIPE instead
449     rv = fcntl(fd_, F_SETNOSIGPIPE, 1);
450     if (rv == -1) {
451       auto errnoCopy = errno;
452       throw AsyncSocketException(
453           AsyncSocketException::INTERNAL_ERROR,
454           "failed to enable F_SETNOSIGPIPE on socket",
455           errnoCopy);
456     }
457 #endif
458
459     // By default, turn on TCP_NODELAY
460     // If setNoDelay() fails, we continue anyway; this isn't a fatal error.
461     // setNoDelay() will log an error message if it fails.
462     // Also set the cached zeroCopyVal_ since it cannot be set earlier if the fd
463     // is not created
464     if (address.getFamily() != AF_UNIX) {
465       (void)setNoDelay(true);
466       setZeroCopy(zeroCopyVal_);
467     }
468
469     VLOG(5) << "AsyncSocket::connect(this=" << this << ", evb=" << eventBase_
470             << ", fd=" << fd_ << ", host=" << address.describe().c_str();
471
472     // bind the socket
473     if (bindAddr != anyAddress()) {
474       int one = 1;
475       if (setsockopt(fd_, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one))) {
476         auto errnoCopy = errno;
477         doClose();
478         throw AsyncSocketException(
479             AsyncSocketException::NOT_OPEN,
480             "failed to setsockopt prior to bind on " + bindAddr.describe(),
481             errnoCopy);
482       }
483
484       bindAddr.getAddress(&addrStorage);
485
486       if (bind(fd_, saddr, bindAddr.getActualSize()) != 0) {
487         auto errnoCopy = errno;
488         doClose();
489         throw AsyncSocketException(
490             AsyncSocketException::NOT_OPEN,
491             "failed to bind to async socket: " + bindAddr.describe(),
492             errnoCopy);
493       }
494     }
495
496     // Apply the additional options if any.
497     for (const auto& opt: options) {
498       rv = opt.first.apply(fd_, opt.second);
499       if (rv != 0) {
500         auto errnoCopy = errno;
501         throw AsyncSocketException(
502             AsyncSocketException::INTERNAL_ERROR,
503             withAddr("failed to set socket option"),
504             errnoCopy);
505       }
506     }
507
508     // Perform the connect()
509     address.getAddress(&addrStorage);
510
511     if (tfoEnabled_) {
512       state_ = StateEnum::FAST_OPEN;
513       tfoAttempted_ = true;
514     } else {
515       if (socketConnect(saddr, addr_.getActualSize()) < 0) {
516         return;
517       }
518     }
519
520     // If we're still here the connect() succeeded immediately.
521     // Fall through to call the callback outside of this try...catch block
522   } catch (const AsyncSocketException& ex) {
523     return failConnect(__func__, ex);
524   } catch (const std::exception& ex) {
525     // shouldn't happen, but handle it just in case
526     VLOG(4) << "AsyncSocket::connect(this=" << this << ", fd=" << fd_
527                << "): unexpected " << typeid(ex).name() << " exception: "
528                << ex.what();
529     AsyncSocketException tex(AsyncSocketException::INTERNAL_ERROR,
530                             withAddr(string("unexpected exception: ") +
531                                      ex.what()));
532     return failConnect(__func__, tex);
533   }
534
535   // The connection succeeded immediately
536   // The read callback may not have been set yet, and no writes may be pending
537   // yet, so we don't have to register for any events at the moment.
538   VLOG(8) << "AsyncSocket::connect succeeded immediately; this=" << this;
539   assert(errMessageCallback_ == nullptr);
540   assert(readCallback_ == nullptr);
541   assert(writeReqHead_ == nullptr);
542   if (state_ != StateEnum::FAST_OPEN) {
543     state_ = StateEnum::ESTABLISHED;
544   }
545   invokeConnectSuccess();
546 }
547
548 int AsyncSocket::socketConnect(const struct sockaddr* saddr, socklen_t len) {
549 #if __linux__
550   if (noTransparentTls_) {
551     // Ignore return value, errors are ok
552     setsockopt(fd_, SOL_SOCKET, SO_NO_TRANSPARENT_TLS, nullptr, 0);
553   }
554   if (noTSocks_) {
555     VLOG(4) << "Disabling TSOCKS for fd " << fd_;
556     // Ignore return value, errors are ok
557     setsockopt(fd_, SOL_SOCKET, SO_NO_TSOCKS, nullptr, 0);
558   }
559 #endif
560   int rv = fsp::connect(fd_, saddr, len);
561   if (rv < 0) {
562     auto errnoCopy = errno;
563     if (errnoCopy == EINPROGRESS) {
564       scheduleConnectTimeout();
565       registerForConnectEvents();
566     } else {
567       throw AsyncSocketException(
568           AsyncSocketException::NOT_OPEN,
569           "connect failed (immediately)",
570           errnoCopy);
571     }
572   }
573   return rv;
574 }
575
576 void AsyncSocket::scheduleConnectTimeout() {
577   // Connection in progress.
578   auto timeout = connectTimeout_.count();
579   if (timeout > 0) {
580     // Start a timer in case the connection takes too long.
581     if (!writeTimeout_.scheduleTimeout(uint32_t(timeout))) {
582       throw AsyncSocketException(
583           AsyncSocketException::INTERNAL_ERROR,
584           withAddr("failed to schedule AsyncSocket connect timeout"));
585     }
586   }
587 }
588
589 void AsyncSocket::registerForConnectEvents() {
590   // Register for write events, so we'll
591   // be notified when the connection finishes/fails.
592   // Note that we don't register for a persistent event here.
593   assert(eventFlags_ == EventHandler::NONE);
594   eventFlags_ = EventHandler::WRITE;
595   if (!ioHandler_.registerHandler(eventFlags_)) {
596     throw AsyncSocketException(
597         AsyncSocketException::INTERNAL_ERROR,
598         withAddr("failed to register AsyncSocket connect handler"));
599   }
600 }
601
602 void AsyncSocket::connect(ConnectCallback* callback,
603                            const string& ip, uint16_t port,
604                            int timeout,
605                            const OptionMap &options) noexcept {
606   DestructorGuard dg(this);
607   try {
608     connectCallback_ = callback;
609     connect(callback, folly::SocketAddress(ip, port), timeout, options);
610   } catch (const std::exception& ex) {
611     AsyncSocketException tex(AsyncSocketException::INTERNAL_ERROR,
612                             ex.what());
613     return failConnect(__func__, tex);
614   }
615 }
616
617 void AsyncSocket::cancelConnect() {
618   connectCallback_ = nullptr;
619   if (state_ == StateEnum::CONNECTING || state_ == StateEnum::FAST_OPEN) {
620     closeNow();
621   }
622 }
623
624 void AsyncSocket::setSendTimeout(uint32_t milliseconds) {
625   sendTimeout_ = milliseconds;
626   if (eventBase_) {
627     eventBase_->dcheckIsInEventBaseThread();
628   }
629
630   // If we are currently pending on write requests, immediately update
631   // writeTimeout_ with the new value.
632   if ((eventFlags_ & EventHandler::WRITE) &&
633       (state_ != StateEnum::CONNECTING && state_ != StateEnum::FAST_OPEN)) {
634     assert(state_ == StateEnum::ESTABLISHED);
635     assert((shutdownFlags_ & SHUT_WRITE) == 0);
636     if (sendTimeout_ > 0) {
637       if (!writeTimeout_.scheduleTimeout(sendTimeout_)) {
638         AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
639             withAddr("failed to reschedule send timeout in setSendTimeout"));
640         return failWrite(__func__, ex);
641       }
642     } else {
643       writeTimeout_.cancelTimeout();
644     }
645   }
646 }
647
648 void AsyncSocket::setErrMessageCB(ErrMessageCallback* callback) {
649   VLOG(6) << "AsyncSocket::setErrMessageCB() this=" << this
650           << ", fd=" << fd_ << ", callback=" << callback
651           << ", state=" << state_;
652
653   // Short circuit if callback is the same as the existing errMessageCallback_.
654   if (callback == errMessageCallback_) {
655     return;
656   }
657
658   if (!msgErrQueueSupported) {
659       // Per-socket error message queue is not supported on this platform.
660       return invalidState(callback);
661   }
662
663   DestructorGuard dg(this);
664   eventBase_->dcheckIsInEventBaseThread();
665
666   if (callback == nullptr) {
667     // We should be able to reset the callback regardless of the
668     // socket state. It's important to have a reliable callback
669     // cancellation mechanism.
670     errMessageCallback_ = callback;
671     return;
672   }
673
674   switch ((StateEnum)state_) {
675     case StateEnum::CONNECTING:
676     case StateEnum::FAST_OPEN:
677     case StateEnum::ESTABLISHED: {
678       errMessageCallback_ = callback;
679       return;
680     }
681     case StateEnum::CLOSED:
682     case StateEnum::ERROR:
683       // We should never reach here.  SHUT_READ should always be set
684       // if we are in STATE_CLOSED or STATE_ERROR.
685       assert(false);
686       return invalidState(callback);
687     case StateEnum::UNINIT:
688       // We do not allow setReadCallback() to be called before we start
689       // connecting.
690       return invalidState(callback);
691   }
692
693   // We don't put a default case in the switch statement, so that the compiler
694   // will warn us to update the switch statement if a new state is added.
695   return invalidState(callback);
696 }
697
698 AsyncSocket::ErrMessageCallback* AsyncSocket::getErrMessageCallback() const {
699   return errMessageCallback_;
700 }
701
702 void AsyncSocket::setSendMsgParamCB(SendMsgParamsCallback* callback) {
703   sendMsgParamCallback_ = callback;
704 }
705
706 AsyncSocket::SendMsgParamsCallback* AsyncSocket::getSendMsgParamsCB() const {
707   return sendMsgParamCallback_;
708 }
709
710 void AsyncSocket::setReadCB(ReadCallback *callback) {
711   VLOG(6) << "AsyncSocket::setReadCallback() this=" << this << ", fd=" << fd_
712           << ", callback=" << callback << ", state=" << state_;
713
714   // Short circuit if callback is the same as the existing readCallback_.
715   //
716   // Note that this is needed for proper functioning during some cleanup cases.
717   // During cleanup we allow setReadCallback(nullptr) to be called even if the
718   // read callback is already unset and we have been detached from an event
719   // base.  This check prevents us from asserting
720   // eventBase_->isInEventBaseThread() when eventBase_ is nullptr.
721   if (callback == readCallback_) {
722     return;
723   }
724
725   /* We are removing a read callback */
726   if (callback == nullptr &&
727       immediateReadHandler_.isLoopCallbackScheduled()) {
728     immediateReadHandler_.cancelLoopCallback();
729   }
730
731   if (shutdownFlags_ & SHUT_READ) {
732     // Reads have already been shut down on this socket.
733     //
734     // Allow setReadCallback(nullptr) to be called in this case, but don't
735     // allow a new callback to be set.
736     //
737     // For example, setReadCallback(nullptr) can happen after an error if we
738     // invoke some other error callback before invoking readError().  The other
739     // error callback that is invoked first may go ahead and clear the read
740     // callback before we get a chance to invoke readError().
741     if (callback != nullptr) {
742       return invalidState(callback);
743     }
744     assert((eventFlags_ & EventHandler::READ) == 0);
745     readCallback_ = nullptr;
746     return;
747   }
748
749   DestructorGuard dg(this);
750   eventBase_->dcheckIsInEventBaseThread();
751
752   switch ((StateEnum)state_) {
753     case StateEnum::CONNECTING:
754     case StateEnum::FAST_OPEN:
755       // For convenience, we allow the read callback to be set while we are
756       // still connecting.  We just store the callback for now.  Once the
757       // connection completes we'll register for read events.
758       readCallback_ = callback;
759       return;
760     case StateEnum::ESTABLISHED:
761     {
762       readCallback_ = callback;
763       uint16_t oldFlags = eventFlags_;
764       if (readCallback_) {
765         eventFlags_ |= EventHandler::READ;
766       } else {
767         eventFlags_ &= ~EventHandler::READ;
768       }
769
770       // Update our registration if our flags have changed
771       if (eventFlags_ != oldFlags) {
772         // We intentionally ignore the return value here.
773         // updateEventRegistration() will move us into the error state if it
774         // fails, and we don't need to do anything else here afterwards.
775         (void)updateEventRegistration();
776       }
777
778       if (readCallback_) {
779         checkForImmediateRead();
780       }
781       return;
782     }
783     case StateEnum::CLOSED:
784     case StateEnum::ERROR:
785       // We should never reach here.  SHUT_READ should always be set
786       // if we are in STATE_CLOSED or STATE_ERROR.
787       assert(false);
788       return invalidState(callback);
789     case StateEnum::UNINIT:
790       // We do not allow setReadCallback() to be called before we start
791       // connecting.
792       return invalidState(callback);
793   }
794
795   // We don't put a default case in the switch statement, so that the compiler
796   // will warn us to update the switch statement if a new state is added.
797   return invalidState(callback);
798 }
799
800 AsyncSocket::ReadCallback* AsyncSocket::getReadCallback() const {
801   return readCallback_;
802 }
803
804 bool AsyncSocket::setZeroCopy(bool enable) {
805   if (msgErrQueueSupported) {
806     zeroCopyVal_ = enable;
807
808     if (fd_ < 0) {
809       return false;
810     }
811
812     int val = enable ? 1 : 0;
813     int ret = setsockopt(fd_, SOL_SOCKET, SO_ZEROCOPY, &val, sizeof(val));
814
815     // if enable == false, set zeroCopyEnabled_ = false regardless
816     // if SO_ZEROCOPY is set or not
817     if (!enable) {
818       zeroCopyEnabled_ = enable;
819       return true;
820     }
821
822     /* if the setsockopt failed, try to see if the socket inherited the flag
823      * since we cannot set SO_ZEROCOPY on a socket s = accept
824      */
825     if (ret) {
826       val = 0;
827       socklen_t optlen = sizeof(val);
828       ret = getsockopt(fd_, SOL_SOCKET, SO_ZEROCOPY, &val, &optlen);
829
830       if (!ret) {
831         enable = val ? true : false;
832       }
833     }
834
835     if (!ret) {
836       zeroCopyEnabled_ = enable;
837
838       return true;
839     }
840   }
841
842   return false;
843 }
844
845 void AsyncSocket::setZeroCopyWriteChainThreshold(size_t threshold) {
846   zeroCopyWriteChainThreshold_ = threshold;
847 }
848
849 bool AsyncSocket::isZeroCopyRequest(WriteFlags flags) {
850   return (zeroCopyEnabled_ && isSet(flags, WriteFlags::WRITE_MSG_ZEROCOPY));
851 }
852
853 void AsyncSocket::adjustZeroCopyFlags(
854     folly::IOBuf* buf,
855     folly::WriteFlags& flags) {
856   if (zeroCopyEnabled_ && zeroCopyWriteChainThreshold_ && buf) {
857     if (buf->computeChainDataLength() >= zeroCopyWriteChainThreshold_) {
858       flags |= folly::WriteFlags::WRITE_MSG_ZEROCOPY;
859     } else {
860       flags = unSet(flags, folly::WriteFlags::WRITE_MSG_ZEROCOPY);
861     }
862   }
863 }
864
865 void AsyncSocket::adjustZeroCopyFlags(
866     const iovec* vec,
867     uint32_t count,
868     folly::WriteFlags& flags) {
869   if (zeroCopyEnabled_ && zeroCopyWriteChainThreshold_) {
870     count = std::min<uint32_t>(count, kIovMax);
871     size_t sum = 0;
872     for (uint32_t i = 0; i < count; ++i) {
873       const iovec* v = vec + i;
874       sum += v->iov_len;
875     }
876
877     if (sum >= zeroCopyWriteChainThreshold_) {
878       flags |= folly::WriteFlags::WRITE_MSG_ZEROCOPY;
879     } else {
880       flags = unSet(flags, folly::WriteFlags::WRITE_MSG_ZEROCOPY);
881     }
882   }
883 }
884
885 void AsyncSocket::addZeroCopyBuff(std::unique_ptr<folly::IOBuf>&& buf) {
886   uint32_t id = getNextZeroCopyBuffId();
887   folly::IOBuf* ptr = buf.get();
888
889   idZeroCopyBufPtrMap_[id] = ptr;
890   auto& p = idZeroCopyBufPtrToBufMap_[ptr];
891   p.first++;
892   CHECK(p.second.get() == nullptr);
893   p.second = std::move(buf);
894 }
895
896 void AsyncSocket::addZeroCopyBuff(folly::IOBuf* ptr) {
897   uint32_t id = getNextZeroCopyBuffId();
898   idZeroCopyBufPtrMap_[id] = ptr;
899
900   idZeroCopyBufPtrToBufMap_[ptr].first++;
901 }
902
903 void AsyncSocket::releaseZeroCopyBuff(uint32_t id) {
904   auto iter = idZeroCopyBufPtrMap_.find(id);
905   CHECK(iter != idZeroCopyBufPtrMap_.end());
906   auto ptr = iter->second;
907   auto iter1 = idZeroCopyBufPtrToBufMap_.find(ptr);
908   CHECK(iter1 != idZeroCopyBufPtrToBufMap_.end());
909   if (0 == --iter1->second.first) {
910     idZeroCopyBufPtrToBufMap_.erase(iter1);
911   }
912 }
913
914 void AsyncSocket::setZeroCopyBuff(std::unique_ptr<folly::IOBuf>&& buf) {
915   folly::IOBuf* ptr = buf.get();
916   auto& p = idZeroCopyBufPtrToBufMap_[ptr];
917   CHECK(p.second.get() == nullptr);
918
919   p.second = std::move(buf);
920 }
921
922 bool AsyncSocket::containsZeroCopyBuff(folly::IOBuf* ptr) {
923   return (
924       idZeroCopyBufPtrToBufMap_.find(ptr) != idZeroCopyBufPtrToBufMap_.end());
925 }
926
927 bool AsyncSocket::isZeroCopyMsg(const cmsghdr& cmsg) const {
928 #ifdef MSG_ERRQUEUE
929   if (zeroCopyEnabled_ &&
930       ((cmsg.cmsg_level == SOL_IP && cmsg.cmsg_type == IP_RECVERR) ||
931        (cmsg.cmsg_level == SOL_IPV6 && cmsg.cmsg_type == IPV6_RECVERR))) {
932     const struct sock_extended_err* serr =
933         reinterpret_cast<const struct sock_extended_err*>(CMSG_DATA(&cmsg));
934     return (
935         (serr->ee_errno == 0) && (serr->ee_origin == SO_EE_ORIGIN_ZEROCOPY));
936   }
937 #endif
938   return false;
939 }
940
941 void AsyncSocket::processZeroCopyMsg(const cmsghdr& cmsg) {
942 #ifdef MSG_ERRQUEUE
943   const struct sock_extended_err* serr =
944       reinterpret_cast<const struct sock_extended_err*>(CMSG_DATA(&cmsg));
945   uint32_t hi = serr->ee_data;
946   uint32_t lo = serr->ee_info;
947
948   for (uint32_t i = lo; i <= hi; i++) {
949     releaseZeroCopyBuff(i);
950   }
951 #endif
952 }
953
954 void AsyncSocket::write(WriteCallback* callback,
955                          const void* buf, size_t bytes, WriteFlags flags) {
956   iovec op;
957   op.iov_base = const_cast<void*>(buf);
958   op.iov_len = bytes;
959   writeImpl(callback, &op, 1, unique_ptr<IOBuf>(), flags);
960 }
961
962 void AsyncSocket::writev(WriteCallback* callback,
963                           const iovec* vec,
964                           size_t count,
965                           WriteFlags flags) {
966   writeImpl(callback, vec, count, unique_ptr<IOBuf>(), flags);
967 }
968
969 void AsyncSocket::writeChain(WriteCallback* callback, unique_ptr<IOBuf>&& buf,
970                               WriteFlags flags) {
971   adjustZeroCopyFlags(buf.get(), flags);
972
973   constexpr size_t kSmallSizeMax = 64;
974   size_t count = buf->countChainElements();
975   if (count <= kSmallSizeMax) {
976     // suppress "warning: variable length array 'vec' is used [-Wvla]"
977     FOLLY_PUSH_WARNING
978     FOLLY_GCC_DISABLE_WARNING("-Wvla")
979     iovec vec[BOOST_PP_IF(FOLLY_HAVE_VLA, count, kSmallSizeMax)];
980     FOLLY_POP_WARNING
981
982     writeChainImpl(callback, vec, count, std::move(buf), flags);
983   } else {
984     iovec* vec = new iovec[count];
985     writeChainImpl(callback, vec, count, std::move(buf), flags);
986     delete[] vec;
987   }
988 }
989
990 void AsyncSocket::writeChainImpl(WriteCallback* callback, iovec* vec,
991     size_t count, unique_ptr<IOBuf>&& buf, WriteFlags flags) {
992   size_t veclen = buf->fillIov(vec, count);
993   writeImpl(callback, vec, veclen, std::move(buf), flags);
994 }
995
996 void AsyncSocket::writeImpl(WriteCallback* callback, const iovec* vec,
997                              size_t count, unique_ptr<IOBuf>&& buf,
998                              WriteFlags flags) {
999   VLOG(6) << "AsyncSocket::writev() this=" << this << ", fd=" << fd_
1000           << ", callback=" << callback << ", count=" << count
1001           << ", state=" << state_;
1002   DestructorGuard dg(this);
1003   unique_ptr<IOBuf>ioBuf(std::move(buf));
1004   eventBase_->dcheckIsInEventBaseThread();
1005
1006   if (shutdownFlags_ & (SHUT_WRITE | SHUT_WRITE_PENDING)) {
1007     // No new writes may be performed after the write side of the socket has
1008     // been shutdown.
1009     //
1010     // We could just call callback->writeError() here to fail just this write.
1011     // However, fail hard and use invalidState() to fail all outstanding
1012     // callbacks and move the socket into the error state.  There's most likely
1013     // a bug in the caller's code, so we abort everything rather than trying to
1014     // proceed as best we can.
1015     return invalidState(callback);
1016   }
1017
1018   uint32_t countWritten = 0;
1019   uint32_t partialWritten = 0;
1020   ssize_t bytesWritten = 0;
1021   bool mustRegister = false;
1022   if ((state_ == StateEnum::ESTABLISHED || state_ == StateEnum::FAST_OPEN) &&
1023       !connecting()) {
1024     if (writeReqHead_ == nullptr) {
1025       // If we are established and there are no other writes pending,
1026       // we can attempt to perform the write immediately.
1027       assert(writeReqTail_ == nullptr);
1028       assert((eventFlags_ & EventHandler::WRITE) == 0);
1029
1030       auto writeResult = performWrite(
1031           vec, uint32_t(count), flags, &countWritten, &partialWritten);
1032       bytesWritten = writeResult.writeReturn;
1033       if (bytesWritten < 0) {
1034         auto errnoCopy = errno;
1035         if (writeResult.exception) {
1036           return failWrite(__func__, callback, 0, *writeResult.exception);
1037         }
1038         AsyncSocketException ex(
1039             AsyncSocketException::INTERNAL_ERROR,
1040             withAddr("writev failed"),
1041             errnoCopy);
1042         return failWrite(__func__, callback, 0, ex);
1043       } else if (countWritten == count) {
1044         // done, add the whole buffer
1045         if (isZeroCopyRequest(flags)) {
1046           addZeroCopyBuff(std::move(ioBuf));
1047         }
1048         // We successfully wrote everything.
1049         // Invoke the callback and return.
1050         if (callback) {
1051           callback->writeSuccess();
1052         }
1053         return;
1054       } else { // continue writing the next writeReq
1055         // add just the ptr
1056         if (isZeroCopyRequest(flags)) {
1057           addZeroCopyBuff(ioBuf.get());
1058         }
1059         if (bufferCallback_) {
1060           bufferCallback_->onEgressBuffered();
1061         }
1062       }
1063       if (!connecting()) {
1064         // Writes might put the socket back into connecting state
1065         // if TFO is enabled, and using TFO fails.
1066         // This means that write timeouts would not be active, however
1067         // connect timeouts would affect this stage.
1068         mustRegister = true;
1069       }
1070     }
1071   } else if (!connecting()) {
1072     // Invalid state for writing
1073     return invalidState(callback);
1074   }
1075
1076   // Create a new WriteRequest to add to the queue
1077   WriteRequest* req;
1078   try {
1079     req = BytesWriteRequest::newRequest(
1080         this,
1081         callback,
1082         vec + countWritten,
1083         uint32_t(count - countWritten),
1084         partialWritten,
1085         uint32_t(bytesWritten),
1086         std::move(ioBuf),
1087         flags);
1088   } catch (const std::exception& ex) {
1089     // we mainly expect to catch std::bad_alloc here
1090     AsyncSocketException tex(AsyncSocketException::INTERNAL_ERROR,
1091         withAddr(string("failed to append new WriteRequest: ") + ex.what()));
1092     return failWrite(__func__, callback, size_t(bytesWritten), tex);
1093   }
1094   req->consume();
1095   if (writeReqTail_ == nullptr) {
1096     assert(writeReqHead_ == nullptr);
1097     writeReqHead_ = writeReqTail_ = req;
1098   } else {
1099     writeReqTail_->append(req);
1100     writeReqTail_ = req;
1101   }
1102
1103   // Register for write events if are established and not currently
1104   // waiting on write events
1105   if (mustRegister) {
1106     assert(state_ == StateEnum::ESTABLISHED);
1107     assert((eventFlags_ & EventHandler::WRITE) == 0);
1108     if (!updateEventRegistration(EventHandler::WRITE, 0)) {
1109       assert(state_ == StateEnum::ERROR);
1110       return;
1111     }
1112     if (sendTimeout_ > 0) {
1113       // Schedule a timeout to fire if the write takes too long.
1114       if (!writeTimeout_.scheduleTimeout(sendTimeout_)) {
1115         AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1116                                withAddr("failed to schedule send timeout"));
1117         return failWrite(__func__, ex);
1118       }
1119     }
1120   }
1121 }
1122
1123 void AsyncSocket::writeRequest(WriteRequest* req) {
1124   if (writeReqTail_ == nullptr) {
1125     assert(writeReqHead_ == nullptr);
1126     writeReqHead_ = writeReqTail_ = req;
1127     req->start();
1128   } else {
1129     writeReqTail_->append(req);
1130     writeReqTail_ = req;
1131   }
1132 }
1133
1134 void AsyncSocket::close() {
1135   VLOG(5) << "AsyncSocket::close(): this=" << this << ", fd_=" << fd_
1136           << ", state=" << state_ << ", shutdownFlags="
1137           << std::hex << (int) shutdownFlags_;
1138
1139   // close() is only different from closeNow() when there are pending writes
1140   // that need to drain before we can close.  In all other cases, just call
1141   // closeNow().
1142   //
1143   // Note that writeReqHead_ can be non-nullptr even in STATE_CLOSED or
1144   // STATE_ERROR if close() is invoked while a previous closeNow() or failure
1145   // is still running.  (e.g., If there are multiple pending writes, and we
1146   // call writeError() on the first one, it may call close().  In this case we
1147   // will already be in STATE_CLOSED or STATE_ERROR, but the remaining pending
1148   // writes will still be in the queue.)
1149   //
1150   // We only need to drain pending writes if we are still in STATE_CONNECTING
1151   // or STATE_ESTABLISHED
1152   if ((writeReqHead_ == nullptr) ||
1153       !(state_ == StateEnum::CONNECTING ||
1154       state_ == StateEnum::ESTABLISHED)) {
1155     closeNow();
1156     return;
1157   }
1158
1159   // Declare a DestructorGuard to ensure that the AsyncSocket cannot be
1160   // destroyed until close() returns.
1161   DestructorGuard dg(this);
1162   eventBase_->dcheckIsInEventBaseThread();
1163
1164   // Since there are write requests pending, we have to set the
1165   // SHUT_WRITE_PENDING flag, and wait to perform the real close until the
1166   // connect finishes and we finish writing these requests.
1167   //
1168   // Set SHUT_READ to indicate that reads are shut down, and set the
1169   // SHUT_WRITE_PENDING flag to mark that we want to shutdown once the
1170   // pending writes complete.
1171   shutdownFlags_ |= (SHUT_READ | SHUT_WRITE_PENDING);
1172
1173   // If a read callback is set, invoke readEOF() immediately to inform it that
1174   // the socket has been closed and no more data can be read.
1175   if (readCallback_) {
1176     // Disable reads if they are enabled
1177     if (!updateEventRegistration(0, EventHandler::READ)) {
1178       // We're now in the error state; callbacks have been cleaned up
1179       assert(state_ == StateEnum::ERROR);
1180       assert(readCallback_ == nullptr);
1181     } else {
1182       ReadCallback* callback = readCallback_;
1183       readCallback_ = nullptr;
1184       callback->readEOF();
1185     }
1186   }
1187 }
1188
1189 void AsyncSocket::closeNow() {
1190   VLOG(5) << "AsyncSocket::closeNow(): this=" << this << ", fd_=" << fd_
1191           << ", state=" << state_ << ", shutdownFlags="
1192           << std::hex << (int) shutdownFlags_;
1193   DestructorGuard dg(this);
1194   if (eventBase_) {
1195     eventBase_->dcheckIsInEventBaseThread();
1196   }
1197
1198   switch (state_) {
1199     case StateEnum::ESTABLISHED:
1200     case StateEnum::CONNECTING:
1201     case StateEnum::FAST_OPEN: {
1202       shutdownFlags_ |= (SHUT_READ | SHUT_WRITE);
1203       state_ = StateEnum::CLOSED;
1204
1205       // If the write timeout was set, cancel it.
1206       writeTimeout_.cancelTimeout();
1207
1208       // If we are registered for I/O events, unregister.
1209       if (eventFlags_ != EventHandler::NONE) {
1210         eventFlags_ = EventHandler::NONE;
1211         if (!updateEventRegistration()) {
1212           // We will have been moved into the error state.
1213           assert(state_ == StateEnum::ERROR);
1214           return;
1215         }
1216       }
1217
1218       if (immediateReadHandler_.isLoopCallbackScheduled()) {
1219         immediateReadHandler_.cancelLoopCallback();
1220       }
1221
1222       if (fd_ >= 0) {
1223         ioHandler_.changeHandlerFD(-1);
1224         doClose();
1225       }
1226
1227       invokeConnectErr(socketClosedLocallyEx);
1228
1229       failAllWrites(socketClosedLocallyEx);
1230
1231       if (readCallback_) {
1232         ReadCallback* callback = readCallback_;
1233         readCallback_ = nullptr;
1234         callback->readEOF();
1235       }
1236       return;
1237     }
1238     case StateEnum::CLOSED:
1239       // Do nothing.  It's possible that we are being called recursively
1240       // from inside a callback that we invoked inside another call to close()
1241       // that is still running.
1242       return;
1243     case StateEnum::ERROR:
1244       // Do nothing.  The error handling code has performed (or is performing)
1245       // cleanup.
1246       return;
1247     case StateEnum::UNINIT:
1248       assert(eventFlags_ == EventHandler::NONE);
1249       assert(connectCallback_ == nullptr);
1250       assert(readCallback_ == nullptr);
1251       assert(writeReqHead_ == nullptr);
1252       shutdownFlags_ |= (SHUT_READ | SHUT_WRITE);
1253       state_ = StateEnum::CLOSED;
1254       return;
1255   }
1256
1257   LOG(DFATAL) << "AsyncSocket::closeNow() (this=" << this << ", fd=" << fd_
1258               << ") called in unknown state " << state_;
1259 }
1260
1261 void AsyncSocket::closeWithReset() {
1262   // Enable SO_LINGER, with the linger timeout set to 0.
1263   // This will trigger a TCP reset when we close the socket.
1264   if (fd_ >= 0) {
1265     struct linger optLinger = {1, 0};
1266     if (setSockOpt(SOL_SOCKET, SO_LINGER, &optLinger) != 0) {
1267       VLOG(2) << "AsyncSocket::closeWithReset(): error setting SO_LINGER "
1268               << "on " << fd_ << ": errno=" << errno;
1269     }
1270   }
1271
1272   // Then let closeNow() take care of the rest
1273   closeNow();
1274 }
1275
1276 void AsyncSocket::shutdownWrite() {
1277   VLOG(5) << "AsyncSocket::shutdownWrite(): this=" << this << ", fd=" << fd_
1278           << ", state=" << state_ << ", shutdownFlags="
1279           << std::hex << (int) shutdownFlags_;
1280
1281   // If there are no pending writes, shutdownWrite() is identical to
1282   // shutdownWriteNow().
1283   if (writeReqHead_ == nullptr) {
1284     shutdownWriteNow();
1285     return;
1286   }
1287
1288   eventBase_->dcheckIsInEventBaseThread();
1289
1290   // There are pending writes.  Set SHUT_WRITE_PENDING so that the actual
1291   // shutdown will be performed once all writes complete.
1292   shutdownFlags_ |= SHUT_WRITE_PENDING;
1293 }
1294
1295 void AsyncSocket::shutdownWriteNow() {
1296   VLOG(5) << "AsyncSocket::shutdownWriteNow(): this=" << this
1297           << ", fd=" << fd_ << ", state=" << state_
1298           << ", shutdownFlags=" << std::hex << (int) shutdownFlags_;
1299
1300   if (shutdownFlags_ & SHUT_WRITE) {
1301     // Writes are already shutdown; nothing else to do.
1302     return;
1303   }
1304
1305   // If SHUT_READ is already set, just call closeNow() to completely
1306   // close the socket.  This can happen if close() was called with writes
1307   // pending, and then shutdownWriteNow() is called before all pending writes
1308   // complete.
1309   if (shutdownFlags_ & SHUT_READ) {
1310     closeNow();
1311     return;
1312   }
1313
1314   DestructorGuard dg(this);
1315   if (eventBase_) {
1316     eventBase_->dcheckIsInEventBaseThread();
1317   }
1318
1319   switch (static_cast<StateEnum>(state_)) {
1320     case StateEnum::ESTABLISHED:
1321     {
1322       shutdownFlags_ |= SHUT_WRITE;
1323
1324       // If the write timeout was set, cancel it.
1325       writeTimeout_.cancelTimeout();
1326
1327       // If we are registered for write events, unregister.
1328       if (!updateEventRegistration(0, EventHandler::WRITE)) {
1329         // We will have been moved into the error state.
1330         assert(state_ == StateEnum::ERROR);
1331         return;
1332       }
1333
1334       // Shutdown writes on the file descriptor
1335       shutdown(fd_, SHUT_WR);
1336
1337       // Immediately fail all write requests
1338       failAllWrites(socketShutdownForWritesEx);
1339       return;
1340     }
1341     case StateEnum::CONNECTING:
1342     {
1343       // Set the SHUT_WRITE_PENDING flag.
1344       // When the connection completes, it will check this flag,
1345       // shutdown the write half of the socket, and then set SHUT_WRITE.
1346       shutdownFlags_ |= SHUT_WRITE_PENDING;
1347
1348       // Immediately fail all write requests
1349       failAllWrites(socketShutdownForWritesEx);
1350       return;
1351     }
1352     case StateEnum::UNINIT:
1353       // Callers normally shouldn't call shutdownWriteNow() before the socket
1354       // even starts connecting.  Nonetheless, go ahead and set
1355       // SHUT_WRITE_PENDING.  Once the socket eventually connects it will
1356       // immediately shut down the write side of the socket.
1357       shutdownFlags_ |= SHUT_WRITE_PENDING;
1358       return;
1359     case StateEnum::FAST_OPEN:
1360       // In fast open state we haven't call connected yet, and if we shutdown
1361       // the writes, we will never try to call connect, so shut everything down
1362       shutdownFlags_ |= SHUT_WRITE;
1363       // Immediately fail all write requests
1364       failAllWrites(socketShutdownForWritesEx);
1365       return;
1366     case StateEnum::CLOSED:
1367     case StateEnum::ERROR:
1368       // We should never get here.  SHUT_WRITE should always be set
1369       // in STATE_CLOSED and STATE_ERROR.
1370       VLOG(4) << "AsyncSocket::shutdownWriteNow() (this=" << this
1371                  << ", fd=" << fd_ << ") in unexpected state " << state_
1372                  << " with SHUT_WRITE not set ("
1373                  << std::hex << (int) shutdownFlags_ << ")";
1374       assert(false);
1375       return;
1376   }
1377
1378   LOG(DFATAL) << "AsyncSocket::shutdownWriteNow() (this=" << this << ", fd="
1379               << fd_ << ") called in unknown state " << state_;
1380 }
1381
1382 bool AsyncSocket::readable() const {
1383   if (fd_ == -1) {
1384     return false;
1385   }
1386   struct pollfd fds[1];
1387   fds[0].fd = fd_;
1388   fds[0].events = POLLIN;
1389   fds[0].revents = 0;
1390   int rc = poll(fds, 1, 0);
1391   return rc == 1;
1392 }
1393
1394 bool AsyncSocket::writable() const {
1395   if (fd_ == -1) {
1396     return false;
1397   }
1398   struct pollfd fds[1];
1399   fds[0].fd = fd_;
1400   fds[0].events = POLLOUT;
1401   fds[0].revents = 0;
1402   int rc = poll(fds, 1, 0);
1403   return rc == 1;
1404 }
1405
1406 bool AsyncSocket::isPending() const {
1407   return ioHandler_.isPending();
1408 }
1409
1410 bool AsyncSocket::hangup() const {
1411   if (fd_ == -1) {
1412     // sanity check, no one should ask for hangup if we are not connected.
1413     assert(false);
1414     return false;
1415   }
1416 #ifdef POLLRDHUP // Linux-only
1417   struct pollfd fds[1];
1418   fds[0].fd = fd_;
1419   fds[0].events = POLLRDHUP|POLLHUP;
1420   fds[0].revents = 0;
1421   poll(fds, 1, 0);
1422   return (fds[0].revents & (POLLRDHUP|POLLHUP)) != 0;
1423 #else
1424   return false;
1425 #endif
1426 }
1427
1428 bool AsyncSocket::good() const {
1429   return (
1430       (state_ == StateEnum::CONNECTING || state_ == StateEnum::FAST_OPEN ||
1431        state_ == StateEnum::ESTABLISHED) &&
1432       (shutdownFlags_ == 0) && (eventBase_ != nullptr));
1433 }
1434
1435 bool AsyncSocket::error() const {
1436   return (state_ == StateEnum::ERROR);
1437 }
1438
1439 void AsyncSocket::attachEventBase(EventBase* eventBase) {
1440   VLOG(5) << "AsyncSocket::attachEventBase(this=" << this << ", fd=" << fd_
1441           << ", old evb=" << eventBase_ << ", new evb=" << eventBase
1442           << ", state=" << state_ << ", events="
1443           << std::hex << eventFlags_ << ")";
1444   assert(eventBase_ == nullptr);
1445   eventBase->dcheckIsInEventBaseThread();
1446
1447   eventBase_ = eventBase;
1448   ioHandler_.attachEventBase(eventBase);
1449   writeTimeout_.attachEventBase(eventBase);
1450   if (evbChangeCb_) {
1451     evbChangeCb_->evbAttached(this);
1452   }
1453 }
1454
1455 void AsyncSocket::detachEventBase() {
1456   VLOG(5) << "AsyncSocket::detachEventBase(this=" << this << ", fd=" << fd_
1457           << ", old evb=" << eventBase_ << ", state=" << state_
1458           << ", events=" << std::hex << eventFlags_ << ")";
1459   assert(eventBase_ != nullptr);
1460   eventBase_->dcheckIsInEventBaseThread();
1461
1462   eventBase_ = nullptr;
1463   ioHandler_.detachEventBase();
1464   writeTimeout_.detachEventBase();
1465   if (evbChangeCb_) {
1466     evbChangeCb_->evbDetached(this);
1467   }
1468 }
1469
1470 bool AsyncSocket::isDetachable() const {
1471   DCHECK(eventBase_ != nullptr);
1472   eventBase_->dcheckIsInEventBaseThread();
1473
1474   return !ioHandler_.isHandlerRegistered() && !writeTimeout_.isScheduled();
1475 }
1476
1477 void AsyncSocket::cacheAddresses() {
1478   if (fd_ >= 0) {
1479     try {
1480       cacheLocalAddress();
1481       cachePeerAddress();
1482     } catch (const std::system_error& e) {
1483       if (e.code() != std::error_code(ENOTCONN, std::system_category())) {
1484         VLOG(1) << "Error caching addresses: " << e.code().value() << ", "
1485                 << e.code().message();
1486       }
1487     }
1488   }
1489 }
1490
1491 void AsyncSocket::cacheLocalAddress() const {
1492   if (!localAddr_.isInitialized()) {
1493     localAddr_.setFromLocalAddress(fd_);
1494   }
1495 }
1496
1497 void AsyncSocket::cachePeerAddress() const {
1498   if (!addr_.isInitialized()) {
1499     addr_.setFromPeerAddress(fd_);
1500   }
1501 }
1502
1503 void AsyncSocket::getLocalAddress(folly::SocketAddress* address) const {
1504   cacheLocalAddress();
1505   *address = localAddr_;
1506 }
1507
1508 void AsyncSocket::getPeerAddress(folly::SocketAddress* address) const {
1509   cachePeerAddress();
1510   *address = addr_;
1511 }
1512
1513 bool AsyncSocket::getTFOSucceded() const {
1514   return detail::tfo_succeeded(fd_);
1515 }
1516
1517 int AsyncSocket::setNoDelay(bool noDelay) {
1518   if (fd_ < 0) {
1519     VLOG(4) << "AsyncSocket::setNoDelay() called on non-open socket "
1520                << this << "(state=" << state_ << ")";
1521     return EINVAL;
1522
1523   }
1524
1525   int value = noDelay ? 1 : 0;
1526   if (setsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &value, sizeof(value)) != 0) {
1527     int errnoCopy = errno;
1528     VLOG(2) << "failed to update TCP_NODELAY option on AsyncSocket "
1529             << this << " (fd=" << fd_ << ", state=" << state_ << "): "
1530             << strerror(errnoCopy);
1531     return errnoCopy;
1532   }
1533
1534   return 0;
1535 }
1536
1537 int AsyncSocket::setCongestionFlavor(const std::string &cname) {
1538
1539   #ifndef TCP_CONGESTION
1540   #define TCP_CONGESTION  13
1541   #endif
1542
1543   if (fd_ < 0) {
1544     VLOG(4) << "AsyncSocket::setCongestionFlavor() called on non-open "
1545                << "socket " << this << "(state=" << state_ << ")";
1546     return EINVAL;
1547
1548   }
1549
1550   if (setsockopt(
1551           fd_,
1552           IPPROTO_TCP,
1553           TCP_CONGESTION,
1554           cname.c_str(),
1555           socklen_t(cname.length() + 1)) != 0) {
1556     int errnoCopy = errno;
1557     VLOG(2) << "failed to update TCP_CONGESTION option on AsyncSocket "
1558             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1559             << strerror(errnoCopy);
1560     return errnoCopy;
1561   }
1562
1563   return 0;
1564 }
1565
1566 int AsyncSocket::setQuickAck(bool quickack) {
1567   (void)quickack;
1568   if (fd_ < 0) {
1569     VLOG(4) << "AsyncSocket::setQuickAck() called on non-open socket "
1570                << this << "(state=" << state_ << ")";
1571     return EINVAL;
1572
1573   }
1574
1575 #ifdef TCP_QUICKACK // Linux-only
1576   int value = quickack ? 1 : 0;
1577   if (setsockopt(fd_, IPPROTO_TCP, TCP_QUICKACK, &value, sizeof(value)) != 0) {
1578     int errnoCopy = errno;
1579     VLOG(2) << "failed to update TCP_QUICKACK option on AsyncSocket"
1580             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1581             << strerror(errnoCopy);
1582     return errnoCopy;
1583   }
1584
1585   return 0;
1586 #else
1587   return ENOSYS;
1588 #endif
1589 }
1590
1591 int AsyncSocket::setSendBufSize(size_t bufsize) {
1592   if (fd_ < 0) {
1593     VLOG(4) << "AsyncSocket::setSendBufSize() called on non-open socket "
1594                << this << "(state=" << state_ << ")";
1595     return EINVAL;
1596   }
1597
1598   if (setsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &bufsize, sizeof(bufsize)) !=0) {
1599     int errnoCopy = errno;
1600     VLOG(2) << "failed to update SO_SNDBUF option on AsyncSocket"
1601             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1602             << strerror(errnoCopy);
1603     return errnoCopy;
1604   }
1605
1606   return 0;
1607 }
1608
1609 int AsyncSocket::setRecvBufSize(size_t bufsize) {
1610   if (fd_ < 0) {
1611     VLOG(4) << "AsyncSocket::setRecvBufSize() called on non-open socket "
1612                << this << "(state=" << state_ << ")";
1613     return EINVAL;
1614   }
1615
1616   if (setsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &bufsize, sizeof(bufsize)) !=0) {
1617     int errnoCopy = errno;
1618     VLOG(2) << "failed to update SO_RCVBUF option on AsyncSocket"
1619             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1620             << strerror(errnoCopy);
1621     return errnoCopy;
1622   }
1623
1624   return 0;
1625 }
1626
1627 int AsyncSocket::setTCPProfile(int profd) {
1628   if (fd_ < 0) {
1629     VLOG(4) << "AsyncSocket::setTCPProfile() called on non-open socket "
1630                << this << "(state=" << state_ << ")";
1631     return EINVAL;
1632   }
1633
1634   if (setsockopt(fd_, SOL_SOCKET, SO_SET_NAMESPACE, &profd, sizeof(int)) !=0) {
1635     int errnoCopy = errno;
1636     VLOG(2) << "failed to set socket namespace option on AsyncSocket"
1637             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1638             << strerror(errnoCopy);
1639     return errnoCopy;
1640   }
1641
1642   return 0;
1643 }
1644
1645 void AsyncSocket::ioReady(uint16_t events) noexcept {
1646   VLOG(7) << "AsyncSocket::ioRead() this=" << this << ", fd=" << fd_
1647           << ", events=" << std::hex << events << ", state=" << state_;
1648   DestructorGuard dg(this);
1649   assert(events & EventHandler::READ_WRITE);
1650   eventBase_->dcheckIsInEventBaseThread();
1651
1652   uint16_t relevantEvents = uint16_t(events & EventHandler::READ_WRITE);
1653   EventBase* originalEventBase = eventBase_;
1654   // If we got there it means that either EventHandler::READ or
1655   // EventHandler::WRITE is set. Any of these flags can
1656   // indicate that there are messages available in the socket
1657   // error message queue.
1658   handleErrMessages();
1659
1660   // Return now if handleErrMessages() detached us from our EventBase
1661   if (eventBase_ != originalEventBase) {
1662     return;
1663   }
1664
1665   if (relevantEvents == EventHandler::READ) {
1666     handleRead();
1667   } else if (relevantEvents == EventHandler::WRITE) {
1668     handleWrite();
1669   } else if (relevantEvents == EventHandler::READ_WRITE) {
1670     // If both read and write events are ready, process writes first.
1671     handleWrite();
1672
1673     // Return now if handleWrite() detached us from our EventBase
1674     if (eventBase_ != originalEventBase) {
1675       return;
1676     }
1677
1678     // Only call handleRead() if a read callback is still installed.
1679     // (It's possible that the read callback was uninstalled during
1680     // handleWrite().)
1681     if (readCallback_) {
1682       handleRead();
1683     }
1684   } else {
1685     VLOG(4) << "AsyncSocket::ioRead() called with unexpected events "
1686                << std::hex << events << "(this=" << this << ")";
1687     abort();
1688   }
1689 }
1690
1691 AsyncSocket::ReadResult
1692 AsyncSocket::performRead(void** buf, size_t* buflen, size_t* /* offset */) {
1693   VLOG(5) << "AsyncSocket::performRead() this=" << this << ", buf=" << *buf
1694           << ", buflen=" << *buflen;
1695
1696   if (preReceivedData_ && !preReceivedData_->empty()) {
1697     VLOG(5) << "AsyncSocket::performRead() this=" << this
1698             << ", reading pre-received data";
1699
1700     io::Cursor cursor(preReceivedData_.get());
1701     auto len = cursor.pullAtMost(*buf, *buflen);
1702
1703     IOBufQueue queue;
1704     queue.append(std::move(preReceivedData_));
1705     queue.trimStart(len);
1706     preReceivedData_ = queue.move();
1707
1708     appBytesReceived_ += len;
1709     return ReadResult(len);
1710   }
1711
1712   ssize_t bytes = recv(fd_, *buf, *buflen, MSG_DONTWAIT);
1713   if (bytes < 0) {
1714     if (errno == EAGAIN || errno == EWOULDBLOCK) {
1715       // No more data to read right now.
1716       return ReadResult(READ_BLOCKING);
1717     } else {
1718       return ReadResult(READ_ERROR);
1719     }
1720   } else {
1721     appBytesReceived_ += bytes;
1722     return ReadResult(bytes);
1723   }
1724 }
1725
1726 void AsyncSocket::prepareReadBuffer(void** buf, size_t* buflen) {
1727   // no matter what, buffer should be preapared for non-ssl socket
1728   CHECK(readCallback_);
1729   readCallback_->getReadBuffer(buf, buflen);
1730 }
1731
1732 void AsyncSocket::handleErrMessages() noexcept {
1733   // This method has non-empty implementation only for platforms
1734   // supporting per-socket error queues.
1735   VLOG(5) << "AsyncSocket::handleErrMessages() this=" << this << ", fd=" << fd_
1736           << ", state=" << state_;
1737   if (errMessageCallback_ == nullptr &&
1738       (!zeroCopyEnabled_ || idZeroCopyBufPtrMap_.empty())) {
1739     VLOG(7) << "AsyncSocket::handleErrMessages(): "
1740             << "no callback installed - exiting.";
1741     return;
1742   }
1743
1744 #ifdef MSG_ERRQUEUE
1745   uint8_t ctrl[1024];
1746   unsigned char data;
1747   struct msghdr msg;
1748   iovec entry;
1749
1750   entry.iov_base = &data;
1751   entry.iov_len = sizeof(data);
1752   msg.msg_iov = &entry;
1753   msg.msg_iovlen = 1;
1754   msg.msg_name = nullptr;
1755   msg.msg_namelen = 0;
1756   msg.msg_control = ctrl;
1757   msg.msg_controllen = sizeof(ctrl);
1758   msg.msg_flags = 0;
1759
1760   int ret;
1761   while (true) {
1762     ret = recvmsg(fd_, &msg, MSG_ERRQUEUE);
1763     VLOG(5) << "AsyncSocket::handleErrMessages(): recvmsg returned " << ret;
1764
1765     if (ret < 0) {
1766       if (errno != EAGAIN) {
1767         auto errnoCopy = errno;
1768         LOG(ERROR) << "::recvmsg exited with code " << ret
1769                    << ", errno: " << errnoCopy;
1770         AsyncSocketException ex(
1771           AsyncSocketException::INTERNAL_ERROR,
1772           withAddr("recvmsg() failed"),
1773           errnoCopy);
1774         failErrMessageRead(__func__, ex);
1775       }
1776       return;
1777     }
1778
1779     for (struct cmsghdr* cmsg = CMSG_FIRSTHDR(&msg);
1780          cmsg != nullptr && cmsg->cmsg_len != 0;
1781          cmsg = CMSG_NXTHDR(&msg, cmsg)) {
1782       if (isZeroCopyMsg(*cmsg)) {
1783         processZeroCopyMsg(*cmsg);
1784       } else {
1785         if (errMessageCallback_) {
1786           errMessageCallback_->errMessage(*cmsg);
1787         }
1788       }
1789     }
1790   }
1791 #endif //MSG_ERRQUEUE
1792 }
1793
1794 void AsyncSocket::handleRead() noexcept {
1795   VLOG(5) << "AsyncSocket::handleRead() this=" << this << ", fd=" << fd_
1796           << ", state=" << state_;
1797   assert(state_ == StateEnum::ESTABLISHED);
1798   assert((shutdownFlags_ & SHUT_READ) == 0);
1799   assert(readCallback_ != nullptr);
1800   assert(eventFlags_ & EventHandler::READ);
1801
1802   // Loop until:
1803   // - a read attempt would block
1804   // - readCallback_ is uninstalled
1805   // - the number of loop iterations exceeds the optional maximum
1806   // - this AsyncSocket is moved to another EventBase
1807   //
1808   // When we invoke readDataAvailable() it may uninstall the readCallback_,
1809   // which is why need to check for it here.
1810   //
1811   // The last bullet point is slightly subtle.  readDataAvailable() may also
1812   // detach this socket from this EventBase.  However, before
1813   // readDataAvailable() returns another thread may pick it up, attach it to
1814   // a different EventBase, and install another readCallback_.  We need to
1815   // exit immediately after readDataAvailable() returns if the eventBase_ has
1816   // changed.  (The caller must perform some sort of locking to transfer the
1817   // AsyncSocket between threads properly.  This will be sufficient to ensure
1818   // that this thread sees the updated eventBase_ variable after
1819   // readDataAvailable() returns.)
1820   uint16_t numReads = 0;
1821   EventBase* originalEventBase = eventBase_;
1822   while (readCallback_ && eventBase_ == originalEventBase) {
1823     // Get the buffer to read into.
1824     void* buf = nullptr;
1825     size_t buflen = 0, offset = 0;
1826     try {
1827       prepareReadBuffer(&buf, &buflen);
1828       VLOG(5) << "prepareReadBuffer() buf=" << buf << ", buflen=" << buflen;
1829     } catch (const AsyncSocketException& ex) {
1830       return failRead(__func__, ex);
1831     } catch (const std::exception& ex) {
1832       AsyncSocketException tex(AsyncSocketException::BAD_ARGS,
1833                               string("ReadCallback::getReadBuffer() "
1834                                      "threw exception: ") +
1835                               ex.what());
1836       return failRead(__func__, tex);
1837     } catch (...) {
1838       AsyncSocketException ex(AsyncSocketException::BAD_ARGS,
1839                              "ReadCallback::getReadBuffer() threw "
1840                              "non-exception type");
1841       return failRead(__func__, ex);
1842     }
1843     if (!isBufferMovable_ && (buf == nullptr || buflen == 0)) {
1844       AsyncSocketException ex(AsyncSocketException::BAD_ARGS,
1845                              "ReadCallback::getReadBuffer() returned "
1846                              "empty buffer");
1847       return failRead(__func__, ex);
1848     }
1849
1850     // Perform the read
1851     auto readResult = performRead(&buf, &buflen, &offset);
1852     auto bytesRead = readResult.readReturn;
1853     VLOG(4) << "this=" << this << ", AsyncSocket::handleRead() got "
1854             << bytesRead << " bytes";
1855     if (bytesRead > 0) {
1856       if (!isBufferMovable_) {
1857         readCallback_->readDataAvailable(size_t(bytesRead));
1858       } else {
1859         CHECK(kOpenSslModeMoveBufferOwnership);
1860         VLOG(5) << "this=" << this << ", AsyncSocket::handleRead() got "
1861                 << "buf=" << buf << ", " << bytesRead << "/" << buflen
1862                 << ", offset=" << offset;
1863         auto readBuf = folly::IOBuf::takeOwnership(buf, buflen);
1864         readBuf->trimStart(offset);
1865         readBuf->trimEnd(buflen - offset - bytesRead);
1866         readCallback_->readBufferAvailable(std::move(readBuf));
1867       }
1868
1869       // Fall through and continue around the loop if the read
1870       // completely filled the available buffer.
1871       // Note that readCallback_ may have been uninstalled or changed inside
1872       // readDataAvailable().
1873       if (size_t(bytesRead) < buflen) {
1874         return;
1875       }
1876     } else if (bytesRead == READ_BLOCKING) {
1877         // No more data to read right now.
1878         return;
1879     } else if (bytesRead == READ_ERROR) {
1880       readErr_ = READ_ERROR;
1881       if (readResult.exception) {
1882         return failRead(__func__, *readResult.exception);
1883       }
1884       auto errnoCopy = errno;
1885       AsyncSocketException ex(
1886           AsyncSocketException::INTERNAL_ERROR,
1887           withAddr("recv() failed"),
1888           errnoCopy);
1889       return failRead(__func__, ex);
1890     } else {
1891       assert(bytesRead == READ_EOF);
1892       readErr_ = READ_EOF;
1893       // EOF
1894       shutdownFlags_ |= SHUT_READ;
1895       if (!updateEventRegistration(0, EventHandler::READ)) {
1896         // we've already been moved into STATE_ERROR
1897         assert(state_ == StateEnum::ERROR);
1898         assert(readCallback_ == nullptr);
1899         return;
1900       }
1901
1902       ReadCallback* callback = readCallback_;
1903       readCallback_ = nullptr;
1904       callback->readEOF();
1905       return;
1906     }
1907     if (maxReadsPerEvent_ && (++numReads >= maxReadsPerEvent_)) {
1908       if (readCallback_ != nullptr) {
1909         // We might still have data in the socket.
1910         // (e.g. see comment in AsyncSSLSocket::checkForImmediateRead)
1911         scheduleImmediateRead();
1912       }
1913       return;
1914     }
1915   }
1916 }
1917
1918 /**
1919  * This function attempts to write as much data as possible, until no more data
1920  * can be written.
1921  *
1922  * - If it sends all available data, it unregisters for write events, and stops
1923  *   the writeTimeout_.
1924  *
1925  * - If not all of the data can be sent immediately, it reschedules
1926  *   writeTimeout_ (if a non-zero timeout is set), and ensures the handler is
1927  *   registered for write events.
1928  */
1929 void AsyncSocket::handleWrite() noexcept {
1930   VLOG(5) << "AsyncSocket::handleWrite() this=" << this << ", fd=" << fd_
1931           << ", state=" << state_;
1932   DestructorGuard dg(this);
1933
1934   if (state_ == StateEnum::CONNECTING) {
1935     handleConnect();
1936     return;
1937   }
1938
1939   // Normal write
1940   assert(state_ == StateEnum::ESTABLISHED);
1941   assert((shutdownFlags_ & SHUT_WRITE) == 0);
1942   assert(writeReqHead_ != nullptr);
1943
1944   // Loop until we run out of write requests,
1945   // or until this socket is moved to another EventBase.
1946   // (See the comment in handleRead() explaining how this can happen.)
1947   EventBase* originalEventBase = eventBase_;
1948   while (writeReqHead_ != nullptr && eventBase_ == originalEventBase) {
1949     auto writeResult = writeReqHead_->performWrite();
1950     if (writeResult.writeReturn < 0) {
1951       if (writeResult.exception) {
1952         return failWrite(__func__, *writeResult.exception);
1953       }
1954       auto errnoCopy = errno;
1955       AsyncSocketException ex(
1956           AsyncSocketException::INTERNAL_ERROR,
1957           withAddr("writev() failed"),
1958           errnoCopy);
1959       return failWrite(__func__, ex);
1960     } else if (writeReqHead_->isComplete()) {
1961       // We finished this request
1962       WriteRequest* req = writeReqHead_;
1963       writeReqHead_ = req->getNext();
1964
1965       if (writeReqHead_ == nullptr) {
1966         writeReqTail_ = nullptr;
1967         // This is the last write request.
1968         // Unregister for write events and cancel the send timer
1969         // before we invoke the callback.  We have to update the state properly
1970         // before calling the callback, since it may want to detach us from
1971         // the EventBase.
1972         if (eventFlags_ & EventHandler::WRITE) {
1973           if (!updateEventRegistration(0, EventHandler::WRITE)) {
1974             assert(state_ == StateEnum::ERROR);
1975             return;
1976           }
1977           // Stop the send timeout
1978           writeTimeout_.cancelTimeout();
1979         }
1980         assert(!writeTimeout_.isScheduled());
1981
1982         // If SHUT_WRITE_PENDING is set, we should shutdown the socket after
1983         // we finish sending the last write request.
1984         //
1985         // We have to do this before invoking writeSuccess(), since
1986         // writeSuccess() may detach us from our EventBase.
1987         if (shutdownFlags_ & SHUT_WRITE_PENDING) {
1988           assert(connectCallback_ == nullptr);
1989           shutdownFlags_ |= SHUT_WRITE;
1990
1991           if (shutdownFlags_ & SHUT_READ) {
1992             // Reads have already been shutdown.  Fully close the socket and
1993             // move to STATE_CLOSED.
1994             //
1995             // Note: This code currently moves us to STATE_CLOSED even if
1996             // close() hasn't ever been called.  This can occur if we have
1997             // received EOF from the peer and shutdownWrite() has been called
1998             // locally.  Should we bother staying in STATE_ESTABLISHED in this
1999             // case, until close() is actually called?  I can't think of a
2000             // reason why we would need to do so.  No other operations besides
2001             // calling close() or destroying the socket can be performed at
2002             // this point.
2003             assert(readCallback_ == nullptr);
2004             state_ = StateEnum::CLOSED;
2005             if (fd_ >= 0) {
2006               ioHandler_.changeHandlerFD(-1);
2007               doClose();
2008             }
2009           } else {
2010             // Reads are still enabled, so we are only doing a half-shutdown
2011             shutdown(fd_, SHUT_WR);
2012           }
2013         }
2014       }
2015
2016       // Invoke the callback
2017       WriteCallback* callback = req->getCallback();
2018       req->destroy();
2019       if (callback) {
2020         callback->writeSuccess();
2021       }
2022       // We'll continue around the loop, trying to write another request
2023     } else {
2024       // Partial write.
2025       if (bufferCallback_) {
2026         bufferCallback_->onEgressBuffered();
2027       }
2028       writeReqHead_->consume();
2029       // Stop after a partial write; it's highly likely that a subsequent write
2030       // attempt will just return EAGAIN.
2031       //
2032       // Ensure that we are registered for write events.
2033       if ((eventFlags_ & EventHandler::WRITE) == 0) {
2034         if (!updateEventRegistration(EventHandler::WRITE, 0)) {
2035           assert(state_ == StateEnum::ERROR);
2036           return;
2037         }
2038       }
2039
2040       // Reschedule the send timeout, since we have made some write progress.
2041       if (sendTimeout_ > 0) {
2042         if (!writeTimeout_.scheduleTimeout(sendTimeout_)) {
2043           AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
2044               withAddr("failed to reschedule write timeout"));
2045           return failWrite(__func__, ex);
2046         }
2047       }
2048       return;
2049     }
2050   }
2051   if (!writeReqHead_ && bufferCallback_) {
2052     bufferCallback_->onEgressBufferCleared();
2053   }
2054 }
2055
2056 void AsyncSocket::checkForImmediateRead() noexcept {
2057   // We currently don't attempt to perform optimistic reads in AsyncSocket.
2058   // (However, note that some subclasses do override this method.)
2059   //
2060   // Simply calling handleRead() here would be bad, as this would call
2061   // readCallback_->getReadBuffer(), forcing the callback to allocate a read
2062   // buffer even though no data may be available.  This would waste lots of
2063   // memory, since the buffer will sit around unused until the socket actually
2064   // becomes readable.
2065   //
2066   // Checking if the socket is readable now also seems like it would probably
2067   // be a pessimism.  In most cases it probably wouldn't be readable, and we
2068   // would just waste an extra system call.  Even if it is readable, waiting to
2069   // find out from libevent on the next event loop doesn't seem that bad.
2070   //
2071   // The exception to this is if we have pre-received data. In that case there
2072   // is definitely data available immediately.
2073   if (preReceivedData_ && !preReceivedData_->empty()) {
2074     handleRead();
2075   }
2076 }
2077
2078 void AsyncSocket::handleInitialReadWrite() noexcept {
2079   // Our callers should already be holding a DestructorGuard, but grab
2080   // one here just to make sure, in case one of our calling code paths ever
2081   // changes.
2082   DestructorGuard dg(this);
2083   // If we have a readCallback_, make sure we enable read events.  We
2084   // may already be registered for reads if connectSuccess() set
2085   // the read calback.
2086   if (readCallback_ && !(eventFlags_ & EventHandler::READ)) {
2087     assert(state_ == StateEnum::ESTABLISHED);
2088     assert((shutdownFlags_ & SHUT_READ) == 0);
2089     if (!updateEventRegistration(EventHandler::READ, 0)) {
2090       assert(state_ == StateEnum::ERROR);
2091       return;
2092     }
2093     checkForImmediateRead();
2094   } else if (readCallback_ == nullptr) {
2095     // Unregister for read events.
2096     updateEventRegistration(0, EventHandler::READ);
2097   }
2098
2099   // If we have write requests pending, try to send them immediately.
2100   // Since we just finished accepting, there is a very good chance that we can
2101   // write without blocking.
2102   //
2103   // However, we only process them if EventHandler::WRITE is not already set,
2104   // which means that we're already blocked on a write attempt.  (This can
2105   // happen if connectSuccess() called write() before returning.)
2106   if (writeReqHead_ && !(eventFlags_ & EventHandler::WRITE)) {
2107     // Call handleWrite() to perform write processing.
2108     handleWrite();
2109   } else if (writeReqHead_ == nullptr) {
2110     // Unregister for write event.
2111     updateEventRegistration(0, EventHandler::WRITE);
2112   }
2113 }
2114
2115 void AsyncSocket::handleConnect() noexcept {
2116   VLOG(5) << "AsyncSocket::handleConnect() this=" << this << ", fd=" << fd_
2117           << ", state=" << state_;
2118   assert(state_ == StateEnum::CONNECTING);
2119   // SHUT_WRITE can never be set while we are still connecting;
2120   // SHUT_WRITE_PENDING may be set, be we only set SHUT_WRITE once the connect
2121   // finishes
2122   assert((shutdownFlags_ & SHUT_WRITE) == 0);
2123
2124   // In case we had a connect timeout, cancel the timeout
2125   writeTimeout_.cancelTimeout();
2126   // We don't use a persistent registration when waiting on a connect event,
2127   // so we have been automatically unregistered now.  Update eventFlags_ to
2128   // reflect reality.
2129   assert(eventFlags_ == EventHandler::WRITE);
2130   eventFlags_ = EventHandler::NONE;
2131
2132   // Call getsockopt() to check if the connect succeeded
2133   int error;
2134   socklen_t len = sizeof(error);
2135   int rv = getsockopt(fd_, SOL_SOCKET, SO_ERROR, &error, &len);
2136   if (rv != 0) {
2137     auto errnoCopy = errno;
2138     AsyncSocketException ex(
2139         AsyncSocketException::INTERNAL_ERROR,
2140         withAddr("error calling getsockopt() after connect"),
2141         errnoCopy);
2142     VLOG(4) << "AsyncSocket::handleConnect(this=" << this << ", fd="
2143                << fd_ << " host=" << addr_.describe()
2144                << ") exception:" << ex.what();
2145     return failConnect(__func__, ex);
2146   }
2147
2148   if (error != 0) {
2149     AsyncSocketException ex(AsyncSocketException::NOT_OPEN,
2150                            "connect failed", error);
2151     VLOG(1) << "AsyncSocket::handleConnect(this=" << this << ", fd="
2152             << fd_ << " host=" << addr_.describe()
2153             << ") exception: " << ex.what();
2154     return failConnect(__func__, ex);
2155   }
2156
2157   // Move into STATE_ESTABLISHED
2158   state_ = StateEnum::ESTABLISHED;
2159
2160   // If SHUT_WRITE_PENDING is set and we don't have any write requests to
2161   // perform, immediately shutdown the write half of the socket.
2162   if ((shutdownFlags_ & SHUT_WRITE_PENDING) && writeReqHead_ == nullptr) {
2163     // SHUT_READ shouldn't be set.  If close() is called on the socket while we
2164     // are still connecting we just abort the connect rather than waiting for
2165     // it to complete.
2166     assert((shutdownFlags_ & SHUT_READ) == 0);
2167     shutdown(fd_, SHUT_WR);
2168     shutdownFlags_ |= SHUT_WRITE;
2169   }
2170
2171   VLOG(7) << "AsyncSocket " << this << ": fd " << fd_
2172           << "successfully connected; state=" << state_;
2173
2174   // Remember the EventBase we are attached to, before we start invoking any
2175   // callbacks (since the callbacks may call detachEventBase()).
2176   EventBase* originalEventBase = eventBase_;
2177
2178   invokeConnectSuccess();
2179   // Note that the connect callback may have changed our state.
2180   // (set or unset the read callback, called write(), closed the socket, etc.)
2181   // The following code needs to handle these situations correctly.
2182   //
2183   // If the socket has been closed, readCallback_ and writeReqHead_ will
2184   // always be nullptr, so that will prevent us from trying to read or write.
2185   //
2186   // The main thing to check for is if eventBase_ is still originalEventBase.
2187   // If not, we have been detached from this event base, so we shouldn't
2188   // perform any more operations.
2189   if (eventBase_ != originalEventBase) {
2190     return;
2191   }
2192
2193   handleInitialReadWrite();
2194 }
2195
2196 void AsyncSocket::timeoutExpired() noexcept {
2197   VLOG(7) << "AsyncSocket " << this << ", fd " << fd_ << ": timeout expired: "
2198           << "state=" << state_ << ", events=" << std::hex << eventFlags_;
2199   DestructorGuard dg(this);
2200   eventBase_->dcheckIsInEventBaseThread();
2201
2202   if (state_ == StateEnum::CONNECTING) {
2203     // connect() timed out
2204     // Unregister for I/O events.
2205     if (connectCallback_) {
2206       AsyncSocketException ex(
2207           AsyncSocketException::TIMED_OUT,
2208           folly::sformat(
2209               "connect timed out after {}ms", connectTimeout_.count()));
2210       failConnect(__func__, ex);
2211     } else {
2212       // we faced a connect error without a connect callback, which could
2213       // happen due to TFO.
2214       AsyncSocketException ex(
2215           AsyncSocketException::TIMED_OUT, "write timed out during connection");
2216       failWrite(__func__, ex);
2217     }
2218   } else {
2219     // a normal write operation timed out
2220     AsyncSocketException ex(
2221         AsyncSocketException::TIMED_OUT,
2222         folly::sformat("write timed out after {}ms", sendTimeout_));
2223     failWrite(__func__, ex);
2224   }
2225 }
2226
2227 ssize_t AsyncSocket::tfoSendMsg(int fd, struct msghdr* msg, int msg_flags) {
2228   return detail::tfo_sendmsg(fd, msg, msg_flags);
2229 }
2230
2231 AsyncSocket::WriteResult
2232 AsyncSocket::sendSocketMessage(int fd, struct msghdr* msg, int msg_flags) {
2233   ssize_t totalWritten = 0;
2234   if (state_ == StateEnum::FAST_OPEN) {
2235     sockaddr_storage addr;
2236     auto len = addr_.getAddress(&addr);
2237     msg->msg_name = &addr;
2238     msg->msg_namelen = len;
2239     totalWritten = tfoSendMsg(fd_, msg, msg_flags);
2240     if (totalWritten >= 0) {
2241       tfoFinished_ = true;
2242       state_ = StateEnum::ESTABLISHED;
2243       // We schedule this asynchrously so that we don't end up
2244       // invoking initial read or write while a write is in progress.
2245       scheduleInitialReadWrite();
2246     } else if (errno == EINPROGRESS) {
2247       VLOG(4) << "TFO falling back to connecting";
2248       // A normal sendmsg doesn't return EINPROGRESS, however
2249       // TFO might fallback to connecting if there is no
2250       // cookie.
2251       state_ = StateEnum::CONNECTING;
2252       try {
2253         scheduleConnectTimeout();
2254         registerForConnectEvents();
2255       } catch (const AsyncSocketException& ex) {
2256         return WriteResult(
2257             WRITE_ERROR, std::make_unique<AsyncSocketException>(ex));
2258       }
2259       // Let's fake it that no bytes were written and return an errno.
2260       errno = EAGAIN;
2261       totalWritten = -1;
2262     } else if (errno == EOPNOTSUPP) {
2263       // Try falling back to connecting.
2264       VLOG(4) << "TFO not supported";
2265       state_ = StateEnum::CONNECTING;
2266       try {
2267         int ret = socketConnect((const sockaddr*)&addr, len);
2268         if (ret == 0) {
2269           // connect succeeded immediately
2270           // Treat this like no data was written.
2271           state_ = StateEnum::ESTABLISHED;
2272           scheduleInitialReadWrite();
2273         }
2274         // If there was no exception during connections,
2275         // we would return that no bytes were written.
2276         errno = EAGAIN;
2277         totalWritten = -1;
2278       } catch (const AsyncSocketException& ex) {
2279         return WriteResult(
2280             WRITE_ERROR, std::make_unique<AsyncSocketException>(ex));
2281       }
2282     } else if (errno == EAGAIN) {
2283       // Normally sendmsg would indicate that the write would block.
2284       // However in the fast open case, it would indicate that sendmsg
2285       // fell back to a connect. This is a return code from connect()
2286       // instead, and is an error condition indicating no fds available.
2287       return WriteResult(
2288           WRITE_ERROR,
2289           std::make_unique<AsyncSocketException>(
2290               AsyncSocketException::UNKNOWN, "No more free local ports"));
2291     }
2292   } else {
2293     totalWritten = ::sendmsg(fd, msg, msg_flags);
2294   }
2295   return WriteResult(totalWritten);
2296 }
2297
2298 AsyncSocket::WriteResult AsyncSocket::performWrite(
2299     const iovec* vec,
2300     uint32_t count,
2301     WriteFlags flags,
2302     uint32_t* countWritten,
2303     uint32_t* partialWritten) {
2304   // We use sendmsg() instead of writev() so that we can pass in MSG_NOSIGNAL
2305   // We correctly handle EPIPE errors, so we never want to receive SIGPIPE
2306   // (since it may terminate the program if the main program doesn't explicitly
2307   // ignore it).
2308   struct msghdr msg;
2309   msg.msg_name = nullptr;
2310   msg.msg_namelen = 0;
2311   msg.msg_iov = const_cast<iovec *>(vec);
2312   msg.msg_iovlen = std::min<size_t>(count, kIovMax);
2313   msg.msg_flags = 0;
2314   msg.msg_controllen = sendMsgParamCallback_->getAncillaryDataSize(flags);
2315   CHECK_GE(AsyncSocket::SendMsgParamsCallback::maxAncillaryDataSize,
2316            msg.msg_controllen);
2317
2318   if (msg.msg_controllen != 0) {
2319     msg.msg_control = reinterpret_cast<char*>(alloca(msg.msg_controllen));
2320     sendMsgParamCallback_->getAncillaryData(flags, msg.msg_control);
2321   } else {
2322     msg.msg_control = nullptr;
2323   }
2324   int msg_flags = sendMsgParamCallback_->getFlags(flags, zeroCopyEnabled_);
2325
2326   auto writeResult = sendSocketMessage(fd_, &msg, msg_flags);
2327   auto totalWritten = writeResult.writeReturn;
2328   if (totalWritten < 0) {
2329     bool tryAgain = (errno == EAGAIN);
2330 #ifdef __APPLE__
2331     // Apple has a bug where doing a second write on a socket which we
2332     // have opened with TFO causes an ENOTCONN to be thrown. However the
2333     // socket is really connected, so treat ENOTCONN as a EAGAIN until
2334     // this bug is fixed.
2335     tryAgain |= (errno == ENOTCONN);
2336 #endif
2337     if (!writeResult.exception && tryAgain) {
2338       // TCP buffer is full; we can't write any more data right now.
2339       *countWritten = 0;
2340       *partialWritten = 0;
2341       return WriteResult(0);
2342     }
2343     // error
2344     *countWritten = 0;
2345     *partialWritten = 0;
2346     return writeResult;
2347   }
2348
2349   appBytesWritten_ += totalWritten;
2350
2351   uint32_t bytesWritten;
2352   uint32_t n;
2353   for (bytesWritten = uint32_t(totalWritten), n = 0; n < count; ++n) {
2354     const iovec* v = vec + n;
2355     if (v->iov_len > bytesWritten) {
2356       // Partial write finished in the middle of this iovec
2357       *countWritten = n;
2358       *partialWritten = bytesWritten;
2359       return WriteResult(totalWritten);
2360     }
2361
2362     bytesWritten -= uint32_t(v->iov_len);
2363   }
2364
2365   assert(bytesWritten == 0);
2366   *countWritten = n;
2367   *partialWritten = 0;
2368   return WriteResult(totalWritten);
2369 }
2370
2371 /**
2372  * Re-register the EventHandler after eventFlags_ has changed.
2373  *
2374  * If an error occurs, fail() is called to move the socket into the error state
2375  * and call all currently installed callbacks.  After an error, the
2376  * AsyncSocket is completely unregistered.
2377  *
2378  * @return Returns true on success, or false on error.
2379  */
2380 bool AsyncSocket::updateEventRegistration() {
2381   VLOG(5) << "AsyncSocket::updateEventRegistration(this=" << this
2382           << ", fd=" << fd_ << ", evb=" << eventBase_ << ", state=" << state_
2383           << ", events=" << std::hex << eventFlags_;
2384   eventBase_->dcheckIsInEventBaseThread();
2385   if (eventFlags_ == EventHandler::NONE) {
2386     ioHandler_.unregisterHandler();
2387     return true;
2388   }
2389
2390   // Always register for persistent events, so we don't have to re-register
2391   // after being called back.
2392   if (!ioHandler_.registerHandler(
2393           uint16_t(eventFlags_ | EventHandler::PERSIST))) {
2394     eventFlags_ = EventHandler::NONE; // we're not registered after error
2395     AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
2396         withAddr("failed to update AsyncSocket event registration"));
2397     fail("updateEventRegistration", ex);
2398     return false;
2399   }
2400
2401   return true;
2402 }
2403
2404 bool AsyncSocket::updateEventRegistration(uint16_t enable,
2405                                            uint16_t disable) {
2406   uint16_t oldFlags = eventFlags_;
2407   eventFlags_ |= enable;
2408   eventFlags_ &= ~disable;
2409   if (eventFlags_ == oldFlags) {
2410     return true;
2411   } else {
2412     return updateEventRegistration();
2413   }
2414 }
2415
2416 void AsyncSocket::startFail() {
2417   // startFail() should only be called once
2418   assert(state_ != StateEnum::ERROR);
2419   assert(getDestructorGuardCount() > 0);
2420   state_ = StateEnum::ERROR;
2421   // Ensure that SHUT_READ and SHUT_WRITE are set,
2422   // so all future attempts to read or write will be rejected
2423   shutdownFlags_ |= (SHUT_READ | SHUT_WRITE);
2424
2425   if (eventFlags_ != EventHandler::NONE) {
2426     eventFlags_ = EventHandler::NONE;
2427     ioHandler_.unregisterHandler();
2428   }
2429   writeTimeout_.cancelTimeout();
2430
2431   if (fd_ >= 0) {
2432     ioHandler_.changeHandlerFD(-1);
2433     doClose();
2434   }
2435 }
2436
2437 void AsyncSocket::invokeAllErrors(const AsyncSocketException& ex) {
2438   invokeConnectErr(ex);
2439   failAllWrites(ex);
2440
2441   if (readCallback_) {
2442     ReadCallback* callback = readCallback_;
2443     readCallback_ = nullptr;
2444     callback->readErr(ex);
2445   }
2446 }
2447
2448 void AsyncSocket::finishFail() {
2449   assert(state_ == StateEnum::ERROR);
2450   assert(getDestructorGuardCount() > 0);
2451
2452   AsyncSocketException ex(
2453       AsyncSocketException::INTERNAL_ERROR,
2454       withAddr("socket closing after error"));
2455   invokeAllErrors(ex);
2456 }
2457
2458 void AsyncSocket::finishFail(const AsyncSocketException& ex) {
2459   assert(state_ == StateEnum::ERROR);
2460   assert(getDestructorGuardCount() > 0);
2461   invokeAllErrors(ex);
2462 }
2463
2464 void AsyncSocket::fail(const char* fn, const AsyncSocketException& ex) {
2465   VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
2466              << state_ << " host=" << addr_.describe()
2467              << "): failed in " << fn << "(): "
2468              << ex.what();
2469   startFail();
2470   finishFail();
2471 }
2472
2473 void AsyncSocket::failConnect(const char* fn, const AsyncSocketException& ex) {
2474   VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
2475                << state_ << " host=" << addr_.describe()
2476                << "): failed while connecting in " << fn << "(): "
2477                << ex.what();
2478   startFail();
2479
2480   invokeConnectErr(ex);
2481   finishFail(ex);
2482 }
2483
2484 void AsyncSocket::failRead(const char* fn, const AsyncSocketException& ex) {
2485   VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
2486                << state_ << " host=" << addr_.describe()
2487                << "): failed while reading in " << fn << "(): "
2488                << ex.what();
2489   startFail();
2490
2491   if (readCallback_ != nullptr) {
2492     ReadCallback* callback = readCallback_;
2493     readCallback_ = nullptr;
2494     callback->readErr(ex);
2495   }
2496
2497   finishFail();
2498 }
2499
2500 void AsyncSocket::failErrMessageRead(const char* fn,
2501                                      const AsyncSocketException& ex) {
2502   VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
2503                << state_ << " host=" << addr_.describe()
2504                << "): failed while reading message in " << fn << "(): "
2505                << ex.what();
2506   startFail();
2507
2508   if (errMessageCallback_ != nullptr) {
2509     ErrMessageCallback* callback = errMessageCallback_;
2510     errMessageCallback_ = nullptr;
2511     callback->errMessageError(ex);
2512   }
2513
2514   finishFail();
2515 }
2516
2517 void AsyncSocket::failWrite(const char* fn, const AsyncSocketException& ex) {
2518   VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
2519                << state_ << " host=" << addr_.describe()
2520                << "): failed while writing in " << fn << "(): "
2521                << ex.what();
2522   startFail();
2523
2524   // Only invoke the first write callback, since the error occurred while
2525   // writing this request.  Let any other pending write callbacks be invoked in
2526   // finishFail().
2527   if (writeReqHead_ != nullptr) {
2528     WriteRequest* req = writeReqHead_;
2529     writeReqHead_ = req->getNext();
2530     WriteCallback* callback = req->getCallback();
2531     uint32_t bytesWritten = req->getTotalBytesWritten();
2532     req->destroy();
2533     if (callback) {
2534       callback->writeErr(bytesWritten, ex);
2535     }
2536   }
2537
2538   finishFail();
2539 }
2540
2541 void AsyncSocket::failWrite(const char* fn, WriteCallback* callback,
2542                              size_t bytesWritten,
2543                              const AsyncSocketException& ex) {
2544   // This version of failWrite() is used when the failure occurs before
2545   // we've added the callback to writeReqHead_.
2546   VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
2547              << state_ << " host=" << addr_.describe()
2548              <<"): failed while writing in " << fn << "(): "
2549              << ex.what();
2550   startFail();
2551
2552   if (callback != nullptr) {
2553     callback->writeErr(bytesWritten, ex);
2554   }
2555
2556   finishFail();
2557 }
2558
2559 void AsyncSocket::failAllWrites(const AsyncSocketException& ex) {
2560   // Invoke writeError() on all write callbacks.
2561   // This is used when writes are forcibly shutdown with write requests
2562   // pending, or when an error occurs with writes pending.
2563   while (writeReqHead_ != nullptr) {
2564     WriteRequest* req = writeReqHead_;
2565     writeReqHead_ = req->getNext();
2566     WriteCallback* callback = req->getCallback();
2567     if (callback) {
2568       callback->writeErr(req->getTotalBytesWritten(), ex);
2569     }
2570     req->destroy();
2571   }
2572 }
2573
2574 void AsyncSocket::invalidState(ConnectCallback* callback) {
2575   VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_
2576           << "): connect() called in invalid state " << state_;
2577
2578   /*
2579    * The invalidState() methods don't use the normal failure mechanisms,
2580    * since we don't know what state we are in.  We don't want to call
2581    * startFail()/finishFail() recursively if we are already in the middle of
2582    * cleaning up.
2583    */
2584
2585   AsyncSocketException ex(AsyncSocketException::ALREADY_OPEN,
2586                          "connect() called with socket in invalid state");
2587   connectEndTime_ = std::chrono::steady_clock::now();
2588   if (state_ == StateEnum::CLOSED || state_ == StateEnum::ERROR) {
2589     if (callback) {
2590       callback->connectErr(ex);
2591     }
2592   } else {
2593     // We can't use failConnect() here since connectCallback_
2594     // may already be set to another callback.  Invoke this ConnectCallback
2595     // here; any other connectCallback_ will be invoked in finishFail()
2596     startFail();
2597     if (callback) {
2598       callback->connectErr(ex);
2599     }
2600     finishFail();
2601   }
2602 }
2603
2604 void AsyncSocket::invalidState(ErrMessageCallback* callback) {
2605   VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_
2606           << "): setErrMessageCB(" << callback
2607           << ") called in invalid state " << state_;
2608
2609   AsyncSocketException ex(
2610       AsyncSocketException::NOT_OPEN,
2611       msgErrQueueSupported
2612       ? "setErrMessageCB() called with socket in invalid state"
2613       : "This platform does not support socket error message notifications");
2614   if (state_ == StateEnum::CLOSED || state_ == StateEnum::ERROR) {
2615     if (callback) {
2616       callback->errMessageError(ex);
2617     }
2618   } else {
2619     startFail();
2620     if (callback) {
2621       callback->errMessageError(ex);
2622     }
2623     finishFail();
2624   }
2625 }
2626
2627 void AsyncSocket::invokeConnectErr(const AsyncSocketException& ex) {
2628   connectEndTime_ = std::chrono::steady_clock::now();
2629   if (connectCallback_) {
2630     ConnectCallback* callback = connectCallback_;
2631     connectCallback_ = nullptr;
2632     callback->connectErr(ex);
2633   }
2634 }
2635
2636 void AsyncSocket::invokeConnectSuccess() {
2637   connectEndTime_ = std::chrono::steady_clock::now();
2638   if (connectCallback_) {
2639     ConnectCallback* callback = connectCallback_;
2640     connectCallback_ = nullptr;
2641     callback->connectSuccess();
2642   }
2643 }
2644
2645 void AsyncSocket::invalidState(ReadCallback* callback) {
2646   VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_
2647              << "): setReadCallback(" << callback
2648              << ") called in invalid state " << state_;
2649
2650   AsyncSocketException ex(AsyncSocketException::NOT_OPEN,
2651                          "setReadCallback() called with socket in "
2652                          "invalid state");
2653   if (state_ == StateEnum::CLOSED || state_ == StateEnum::ERROR) {
2654     if (callback) {
2655       callback->readErr(ex);
2656     }
2657   } else {
2658     startFail();
2659     if (callback) {
2660       callback->readErr(ex);
2661     }
2662     finishFail();
2663   }
2664 }
2665
2666 void AsyncSocket::invalidState(WriteCallback* callback) {
2667   VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_
2668              << "): write() called in invalid state " << state_;
2669
2670   AsyncSocketException ex(AsyncSocketException::NOT_OPEN,
2671                          withAddr("write() called with socket in invalid state"));
2672   if (state_ == StateEnum::CLOSED || state_ == StateEnum::ERROR) {
2673     if (callback) {
2674       callback->writeErr(0, ex);
2675     }
2676   } else {
2677     startFail();
2678     if (callback) {
2679       callback->writeErr(0, ex);
2680     }
2681     finishFail();
2682   }
2683 }
2684
2685 void AsyncSocket::doClose() {
2686   if (fd_ == -1) return;
2687   if (shutdownSocketSet_) {
2688     shutdownSocketSet_->close(fd_);
2689   } else {
2690     ::close(fd_);
2691   }
2692   fd_ = -1;
2693 }
2694
2695 std::ostream& operator << (std::ostream& os,
2696                            const AsyncSocket::StateEnum& state) {
2697   os << static_cast<int>(state);
2698   return os;
2699 }
2700
2701 std::string AsyncSocket::withAddr(const std::string& s) {
2702   // Don't use addr_ directly because it may not be initialized
2703   // e.g. if constructed from fd
2704   folly::SocketAddress peer, local;
2705   try {
2706     getPeerAddress(&peer);
2707     getLocalAddress(&local);
2708   } catch (const std::exception&) {
2709     // ignore
2710   } catch (...) {
2711     // ignore
2712   }
2713   return s + " (peer=" + peer.describe() + ", local=" + local.describe() + ")";
2714 }
2715
2716 void AsyncSocket::setBufferCallback(BufferCallback* cb) {
2717   bufferCallback_ = cb;
2718 }
2719
2720 } // namespace folly