opt proxygen with newly added OpenSSL functions
[folly.git] / folly / io / async / AsyncSocket.cpp
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/io/async/AsyncSocket.h>
18
19 #include <folly/io/async/EventBase.h>
20 #include <folly/io/async/EventHandler.h>
21 #include <folly/SocketAddress.h>
22 #include <folly/io/IOBuf.h>
23
24 #include <poll.h>
25 #include <errno.h>
26 #include <limits.h>
27 #include <unistd.h>
28 #include <thread>
29 #include <fcntl.h>
30 #include <sys/types.h>
31 #include <sys/socket.h>
32 #include <netinet/in.h>
33 #include <netinet/tcp.h>
34
35 using std::string;
36 using std::unique_ptr;
37
38 namespace folly {
39
40 // static members initializers
41 const AsyncSocket::OptionMap AsyncSocket::emptyOptionMap;
42
43 const AsyncSocketException socketClosedLocallyEx(
44     AsyncSocketException::END_OF_FILE, "socket closed locally");
45 const AsyncSocketException socketShutdownForWritesEx(
46     AsyncSocketException::END_OF_FILE, "socket shutdown for writes");
47
48 // TODO: It might help performance to provide a version of BytesWriteRequest that
49 // users could derive from, so we can avoid the extra allocation for each call
50 // to write()/writev().  We could templatize TFramedAsyncChannel just like the
51 // protocols are currently templatized for transports.
52 //
53 // We would need the version for external users where they provide the iovec
54 // storage space, and only our internal version would allocate it at the end of
55 // the WriteRequest.
56
57 /* The default WriteRequest implementation, used for write(), writev() and
58  * writeChain()
59  *
60  * A new BytesWriteRequest operation is allocated on the heap for all write
61  * operations that cannot be completed immediately.
62  */
63 class AsyncSocket::BytesWriteRequest : public AsyncSocket::WriteRequest {
64  public:
65   static BytesWriteRequest* newRequest(AsyncSocket* socket,
66                                        WriteCallback* callback,
67                                        const iovec* ops,
68                                        uint32_t opCount,
69                                        uint32_t partialWritten,
70                                        uint32_t bytesWritten,
71                                        unique_ptr<IOBuf>&& ioBuf,
72                                        WriteFlags flags) {
73     assert(opCount > 0);
74     // Since we put a variable size iovec array at the end
75     // of each BytesWriteRequest, we have to manually allocate the memory.
76     void* buf = malloc(sizeof(BytesWriteRequest) +
77                        (opCount * sizeof(struct iovec)));
78     if (buf == nullptr) {
79       throw std::bad_alloc();
80     }
81
82     return new(buf) BytesWriteRequest(socket, callback, ops, opCount,
83                                       partialWritten, bytesWritten,
84                                       std::move(ioBuf), flags);
85   }
86
87   void destroy() override {
88     this->~BytesWriteRequest();
89     free(this);
90   }
91
92   bool performWrite() override {
93     WriteFlags writeFlags = flags_;
94     if (getNext() != nullptr) {
95       writeFlags = writeFlags | WriteFlags::CORK;
96     }
97     bytesWritten_ = socket_->performWrite(getOps(), getOpCount(), writeFlags,
98                                           &opsWritten_, &partialBytes_);
99     return bytesWritten_ >= 0;
100   }
101
102   bool isComplete() override {
103     return opsWritten_ == getOpCount();
104   }
105
106   void consume() override {
107     // Advance opIndex_ forward by opsWritten_
108     opIndex_ += opsWritten_;
109     assert(opIndex_ < opCount_);
110
111     // If we've finished writing any IOBufs, release them
112     if (ioBuf_) {
113       for (uint32_t i = opsWritten_; i != 0; --i) {
114         assert(ioBuf_);
115         ioBuf_ = ioBuf_->pop();
116       }
117     }
118
119     // Move partialBytes_ forward into the current iovec buffer
120     struct iovec* currentOp = writeOps_ + opIndex_;
121     assert((partialBytes_ < currentOp->iov_len) || (currentOp->iov_len == 0));
122     currentOp->iov_base =
123       reinterpret_cast<uint8_t*>(currentOp->iov_base) + partialBytes_;
124     currentOp->iov_len -= partialBytes_;
125
126     // Increment the totalBytesWritten_ count by bytesWritten_;
127     totalBytesWritten_ += bytesWritten_;
128   }
129
130  private:
131   BytesWriteRequest(AsyncSocket* socket,
132                     WriteCallback* callback,
133                     const struct iovec* ops,
134                     uint32_t opCount,
135                     uint32_t partialBytes,
136                     uint32_t bytesWritten,
137                     unique_ptr<IOBuf>&& ioBuf,
138                     WriteFlags flags)
139     : AsyncSocket::WriteRequest(socket, callback)
140     , opCount_(opCount)
141     , opIndex_(0)
142     , flags_(flags)
143     , ioBuf_(std::move(ioBuf))
144     , opsWritten_(0)
145     , partialBytes_(partialBytes)
146     , bytesWritten_(bytesWritten) {
147     memcpy(writeOps_, ops, sizeof(*ops) * opCount_);
148   }
149
150   // private destructor, to ensure callers use destroy()
151   ~BytesWriteRequest() override = default;
152
153   const struct iovec* getOps() const {
154     assert(opCount_ > opIndex_);
155     return writeOps_ + opIndex_;
156   }
157
158   uint32_t getOpCount() const {
159     assert(opCount_ > opIndex_);
160     return opCount_ - opIndex_;
161   }
162
163   uint32_t opCount_;            ///< number of entries in writeOps_
164   uint32_t opIndex_;            ///< current index into writeOps_
165   WriteFlags flags_;            ///< set for WriteFlags
166   unique_ptr<IOBuf> ioBuf_;     ///< underlying IOBuf, or nullptr if N/A
167
168   // for consume(), how much we wrote on the last write
169   uint32_t opsWritten_;         ///< complete ops written
170   uint32_t partialBytes_;       ///< partial bytes of incomplete op written
171   ssize_t bytesWritten_;        ///< bytes written altogether
172
173   struct iovec writeOps_[];     ///< write operation(s) list
174 };
175
176 AsyncSocket::AsyncSocket()
177   : eventBase_(nullptr)
178   , writeTimeout_(this, nullptr)
179   , ioHandler_(this, nullptr)
180   , immediateReadHandler_(this) {
181   VLOG(5) << "new AsyncSocket()";
182   init();
183 }
184
185 AsyncSocket::AsyncSocket(EventBase* evb)
186   : eventBase_(evb)
187   , writeTimeout_(this, evb)
188   , ioHandler_(this, evb)
189   , immediateReadHandler_(this) {
190   VLOG(5) << "new AsyncSocket(" << this << ", evb=" << evb << ")";
191   init();
192 }
193
194 AsyncSocket::AsyncSocket(EventBase* evb,
195                            const folly::SocketAddress& address,
196                            uint32_t connectTimeout)
197   : AsyncSocket(evb) {
198   connect(nullptr, address, connectTimeout);
199 }
200
201 AsyncSocket::AsyncSocket(EventBase* evb,
202                            const std::string& ip,
203                            uint16_t port,
204                            uint32_t connectTimeout)
205   : AsyncSocket(evb) {
206   connect(nullptr, ip, port, connectTimeout);
207 }
208
209 AsyncSocket::AsyncSocket(EventBase* evb, int fd)
210   : eventBase_(evb)
211   , writeTimeout_(this, evb)
212   , ioHandler_(this, evb, fd)
213   , immediateReadHandler_(this) {
214   VLOG(5) << "new AsyncSocket(" << this << ", evb=" << evb << ", fd="
215           << fd << ")";
216   init();
217   fd_ = fd;
218   setCloseOnExec();
219   state_ = StateEnum::ESTABLISHED;
220 }
221
222 // init() method, since constructor forwarding isn't supported in most
223 // compilers yet.
224 void AsyncSocket::init() {
225   assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
226   shutdownFlags_ = 0;
227   state_ = StateEnum::UNINIT;
228   eventFlags_ = EventHandler::NONE;
229   fd_ = -1;
230   sendTimeout_ = 0;
231   maxReadsPerEvent_ = 16;
232   connectCallback_ = nullptr;
233   readCallback_ = nullptr;
234   writeReqHead_ = nullptr;
235   writeReqTail_ = nullptr;
236   shutdownSocketSet_ = nullptr;
237   appBytesWritten_ = 0;
238   appBytesReceived_ = 0;
239 }
240
241 AsyncSocket::~AsyncSocket() {
242   VLOG(7) << "actual destruction of AsyncSocket(this=" << this
243           << ", evb=" << eventBase_ << ", fd=" << fd_
244           << ", state=" << state_ << ")";
245 }
246
247 void AsyncSocket::destroy() {
248   VLOG(5) << "AsyncSocket::destroy(this=" << this << ", evb=" << eventBase_
249           << ", fd=" << fd_ << ", state=" << state_;
250   // When destroy is called, close the socket immediately
251   closeNow();
252
253   // Then call DelayedDestruction::destroy() to take care of
254   // whether or not we need immediate or delayed destruction
255   DelayedDestruction::destroy();
256 }
257
258 int AsyncSocket::detachFd() {
259   VLOG(6) << "AsyncSocket::detachFd(this=" << this << ", fd=" << fd_
260           << ", evb=" << eventBase_ << ", state=" << state_
261           << ", events=" << std::hex << eventFlags_ << ")";
262   // Extract the fd, and set fd_ to -1 first, so closeNow() won't
263   // actually close the descriptor.
264   if (shutdownSocketSet_) {
265     shutdownSocketSet_->remove(fd_);
266   }
267   int fd = fd_;
268   fd_ = -1;
269   // Call closeNow() to invoke all pending callbacks with an error.
270   closeNow();
271   // Update the EventHandler to stop using this fd.
272   // This can only be done after closeNow() unregisters the handler.
273   ioHandler_.changeHandlerFD(-1);
274   return fd;
275 }
276
277 const folly::SocketAddress& AsyncSocket::anyAddress() {
278   static const folly::SocketAddress anyAddress =
279     folly::SocketAddress("0.0.0.0", 0);
280   return anyAddress;
281 }
282
283 void AsyncSocket::setShutdownSocketSet(ShutdownSocketSet* newSS) {
284   if (shutdownSocketSet_ == newSS) {
285     return;
286   }
287   if (shutdownSocketSet_ && fd_ != -1) {
288     shutdownSocketSet_->remove(fd_);
289   }
290   shutdownSocketSet_ = newSS;
291   if (shutdownSocketSet_ && fd_ != -1) {
292     shutdownSocketSet_->add(fd_);
293   }
294 }
295
296 void AsyncSocket::setCloseOnExec() {
297   int rv = fcntl(fd_, F_SETFD, FD_CLOEXEC);
298   if (rv != 0) {
299     throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
300                                withAddr("failed to set close-on-exec flag"),
301                                errno);
302   }
303 }
304
305 void AsyncSocket::connect(ConnectCallback* callback,
306                            const folly::SocketAddress& address,
307                            int timeout,
308                            const OptionMap &options,
309                            const folly::SocketAddress& bindAddr) noexcept {
310   DestructorGuard dg(this);
311   assert(eventBase_->isInEventBaseThread());
312
313   addr_ = address;
314
315   // Make sure we're in the uninitialized state
316   if (state_ != StateEnum::UNINIT) {
317     return invalidState(callback);
318   }
319
320   assert(fd_ == -1);
321   state_ = StateEnum::CONNECTING;
322   connectCallback_ = callback;
323
324   sockaddr_storage addrStorage;
325   sockaddr* saddr = reinterpret_cast<sockaddr*>(&addrStorage);
326
327   try {
328     // Create the socket
329     // Technically the first parameter should actually be a protocol family
330     // constant (PF_xxx) rather than an address family (AF_xxx), but the
331     // distinction is mainly just historical.  In pretty much all
332     // implementations the PF_foo and AF_foo constants are identical.
333     fd_ = socket(address.getFamily(), SOCK_STREAM, 0);
334     if (fd_ < 0) {
335       throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
336                                 withAddr("failed to create socket"), errno);
337     }
338     if (shutdownSocketSet_) {
339       shutdownSocketSet_->add(fd_);
340     }
341     ioHandler_.changeHandlerFD(fd_);
342
343     setCloseOnExec();
344
345     // Put the socket in non-blocking mode
346     int flags = fcntl(fd_, F_GETFL, 0);
347     if (flags == -1) {
348       throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
349                                 withAddr("failed to get socket flags"), errno);
350     }
351     int rv = fcntl(fd_, F_SETFL, flags | O_NONBLOCK);
352     if (rv == -1) {
353       throw AsyncSocketException(
354           AsyncSocketException::INTERNAL_ERROR,
355           withAddr("failed to put socket in non-blocking mode"),
356           errno);
357     }
358
359 #if !defined(MSG_NOSIGNAL) && defined(F_SETNOSIGPIPE)
360     // iOS and OS X don't support MSG_NOSIGNAL; set F_SETNOSIGPIPE instead
361     rv = fcntl(fd_, F_SETNOSIGPIPE, 1);
362     if (rv == -1) {
363       throw AsyncSocketException(
364           AsyncSocketException::INTERNAL_ERROR,
365           "failed to enable F_SETNOSIGPIPE on socket",
366           errno);
367     }
368 #endif
369
370     // By default, turn on TCP_NODELAY
371     // If setNoDelay() fails, we continue anyway; this isn't a fatal error.
372     // setNoDelay() will log an error message if it fails.
373     if (address.getFamily() != AF_UNIX) {
374       (void)setNoDelay(true);
375     }
376
377     VLOG(5) << "AsyncSocket::connect(this=" << this << ", evb=" << eventBase_
378             << ", fd=" << fd_ << ", host=" << address.describe().c_str();
379
380     // bind the socket
381     if (bindAddr != anyAddress()) {
382       int one = 1;
383       if (::setsockopt(fd_, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one))) {
384         doClose();
385         throw AsyncSocketException(
386           AsyncSocketException::NOT_OPEN,
387           "failed to setsockopt prior to bind on " + bindAddr.describe(),
388           errno);
389       }
390
391       bindAddr.getAddress(&addrStorage);
392
393       if (::bind(fd_, saddr, bindAddr.getActualSize()) != 0) {
394         doClose();
395         throw AsyncSocketException(AsyncSocketException::NOT_OPEN,
396                                   "failed to bind to async socket: " +
397                                   bindAddr.describe(),
398                                   errno);
399       }
400     }
401
402     // Apply the additional options if any.
403     for (const auto& opt: options) {
404       int rv = opt.first.apply(fd_, opt.second);
405       if (rv != 0) {
406         throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
407                                   withAddr("failed to set socket option"),
408                                   errno);
409       }
410     }
411
412     // Perform the connect()
413     address.getAddress(&addrStorage);
414
415     rv = ::connect(fd_, saddr, address.getActualSize());
416     if (rv < 0) {
417       if (errno == EINPROGRESS) {
418         // Connection in progress.
419         if (timeout > 0) {
420           // Start a timer in case the connection takes too long.
421           if (!writeTimeout_.scheduleTimeout(timeout)) {
422             throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
423                 withAddr("failed to schedule AsyncSocket connect timeout"));
424           }
425         }
426
427         // Register for write events, so we'll
428         // be notified when the connection finishes/fails.
429         // Note that we don't register for a persistent event here.
430         assert(eventFlags_ == EventHandler::NONE);
431         eventFlags_ = EventHandler::WRITE;
432         if (!ioHandler_.registerHandler(eventFlags_)) {
433           throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
434               withAddr("failed to register AsyncSocket connect handler"));
435         }
436         return;
437       } else {
438         throw AsyncSocketException(AsyncSocketException::NOT_OPEN,
439                                   "connect failed (immediately)", errno);
440       }
441     }
442
443     // If we're still here the connect() succeeded immediately.
444     // Fall through to call the callback outside of this try...catch block
445   } catch (const AsyncSocketException& ex) {
446     return failConnect(__func__, ex);
447   } catch (const std::exception& ex) {
448     // shouldn't happen, but handle it just in case
449     VLOG(4) << "AsyncSocket::connect(this=" << this << ", fd=" << fd_
450                << "): unexpected " << typeid(ex).name() << " exception: "
451                << ex.what();
452     AsyncSocketException tex(AsyncSocketException::INTERNAL_ERROR,
453                             withAddr(string("unexpected exception: ") +
454                                      ex.what()));
455     return failConnect(__func__, tex);
456   }
457
458   // The connection succeeded immediately
459   // The read callback may not have been set yet, and no writes may be pending
460   // yet, so we don't have to register for any events at the moment.
461   VLOG(8) << "AsyncSocket::connect succeeded immediately; this=" << this;
462   assert(readCallback_ == nullptr);
463   assert(writeReqHead_ == nullptr);
464   state_ = StateEnum::ESTABLISHED;
465   if (callback) {
466     connectCallback_ = nullptr;
467     callback->connectSuccess();
468   }
469 }
470
471 void AsyncSocket::connect(ConnectCallback* callback,
472                            const string& ip, uint16_t port,
473                            int timeout,
474                            const OptionMap &options) noexcept {
475   DestructorGuard dg(this);
476   try {
477     connectCallback_ = callback;
478     connect(callback, folly::SocketAddress(ip, port), timeout, options);
479   } catch (const std::exception& ex) {
480     AsyncSocketException tex(AsyncSocketException::INTERNAL_ERROR,
481                             ex.what());
482     return failConnect(__func__, tex);
483   }
484 }
485
486 void AsyncSocket::cancelConnect() {
487   connectCallback_ = nullptr;
488   if (state_ == StateEnum::CONNECTING) {
489     closeNow();
490   }
491 }
492
493 void AsyncSocket::setSendTimeout(uint32_t milliseconds) {
494   sendTimeout_ = milliseconds;
495   assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
496
497   // If we are currently pending on write requests, immediately update
498   // writeTimeout_ with the new value.
499   if ((eventFlags_ & EventHandler::WRITE) &&
500       (state_ != StateEnum::CONNECTING)) {
501     assert(state_ == StateEnum::ESTABLISHED);
502     assert((shutdownFlags_ & SHUT_WRITE) == 0);
503     if (sendTimeout_ > 0) {
504       if (!writeTimeout_.scheduleTimeout(sendTimeout_)) {
505         AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
506             withAddr("failed to reschedule send timeout in setSendTimeout"));
507         return failWrite(__func__, ex);
508       }
509     } else {
510       writeTimeout_.cancelTimeout();
511     }
512   }
513 }
514
515 void AsyncSocket::setReadCB(ReadCallback *callback) {
516   VLOG(6) << "AsyncSocket::setReadCallback() this=" << this << ", fd=" << fd_
517           << ", callback=" << callback << ", state=" << state_;
518
519   // Short circuit if callback is the same as the existing readCallback_.
520   //
521   // Note that this is needed for proper functioning during some cleanup cases.
522   // During cleanup we allow setReadCallback(nullptr) to be called even if the
523   // read callback is already unset and we have been detached from an event
524   // base.  This check prevents us from asserting
525   // eventBase_->isInEventBaseThread() when eventBase_ is nullptr.
526   if (callback == readCallback_) {
527     return;
528   }
529
530   /* We are removing a read callback */
531   if (callback == nullptr &&
532       immediateReadHandler_.isLoopCallbackScheduled()) {
533     immediateReadHandler_.cancelLoopCallback();
534   }
535
536   if (shutdownFlags_ & SHUT_READ) {
537     // Reads have already been shut down on this socket.
538     //
539     // Allow setReadCallback(nullptr) to be called in this case, but don't
540     // allow a new callback to be set.
541     //
542     // For example, setReadCallback(nullptr) can happen after an error if we
543     // invoke some other error callback before invoking readError().  The other
544     // error callback that is invoked first may go ahead and clear the read
545     // callback before we get a chance to invoke readError().
546     if (callback != nullptr) {
547       return invalidState(callback);
548     }
549     assert((eventFlags_ & EventHandler::READ) == 0);
550     readCallback_ = nullptr;
551     return;
552   }
553
554   DestructorGuard dg(this);
555   assert(eventBase_->isInEventBaseThread());
556
557   switch ((StateEnum)state_) {
558     case StateEnum::CONNECTING:
559       // For convenience, we allow the read callback to be set while we are
560       // still connecting.  We just store the callback for now.  Once the
561       // connection completes we'll register for read events.
562       readCallback_ = callback;
563       return;
564     case StateEnum::ESTABLISHED:
565     {
566       readCallback_ = callback;
567       uint16_t oldFlags = eventFlags_;
568       if (readCallback_) {
569         eventFlags_ |= EventHandler::READ;
570       } else {
571         eventFlags_ &= ~EventHandler::READ;
572       }
573
574       // Update our registration if our flags have changed
575       if (eventFlags_ != oldFlags) {
576         // We intentionally ignore the return value here.
577         // updateEventRegistration() will move us into the error state if it
578         // fails, and we don't need to do anything else here afterwards.
579         (void)updateEventRegistration();
580       }
581
582       if (readCallback_) {
583         checkForImmediateRead();
584       }
585       return;
586     }
587     case StateEnum::CLOSED:
588     case StateEnum::ERROR:
589       // We should never reach here.  SHUT_READ should always be set
590       // if we are in STATE_CLOSED or STATE_ERROR.
591       assert(false);
592       return invalidState(callback);
593     case StateEnum::UNINIT:
594       // We do not allow setReadCallback() to be called before we start
595       // connecting.
596       return invalidState(callback);
597   }
598
599   // We don't put a default case in the switch statement, so that the compiler
600   // will warn us to update the switch statement if a new state is added.
601   return invalidState(callback);
602 }
603
604 AsyncSocket::ReadCallback* AsyncSocket::getReadCallback() const {
605   return readCallback_;
606 }
607
608 void AsyncSocket::write(WriteCallback* callback,
609                          const void* buf, size_t bytes, WriteFlags flags) {
610   iovec op;
611   op.iov_base = const_cast<void*>(buf);
612   op.iov_len = bytes;
613   writeImpl(callback, &op, 1, std::move(unique_ptr<IOBuf>()), flags);
614 }
615
616 void AsyncSocket::writev(WriteCallback* callback,
617                           const iovec* vec,
618                           size_t count,
619                           WriteFlags flags) {
620   writeImpl(callback, vec, count, std::move(unique_ptr<IOBuf>()), flags);
621 }
622
623 void AsyncSocket::writeChain(WriteCallback* callback, unique_ptr<IOBuf>&& buf,
624                               WriteFlags flags) {
625   size_t count = buf->countChainElements();
626   if (count <= 64) {
627     iovec vec[count];
628     writeChainImpl(callback, vec, count, std::move(buf), flags);
629   } else {
630     iovec* vec = new iovec[count];
631     writeChainImpl(callback, vec, count, std::move(buf), flags);
632     delete[] vec;
633   }
634 }
635
636 void AsyncSocket::writeChainImpl(WriteCallback* callback, iovec* vec,
637     size_t count, unique_ptr<IOBuf>&& buf, WriteFlags flags) {
638   size_t veclen = buf->fillIov(vec, count);
639   writeImpl(callback, vec, veclen, std::move(buf), flags);
640 }
641
642 void AsyncSocket::writeImpl(WriteCallback* callback, const iovec* vec,
643                              size_t count, unique_ptr<IOBuf>&& buf,
644                              WriteFlags flags) {
645   VLOG(6) << "AsyncSocket::writev() this=" << this << ", fd=" << fd_
646           << ", callback=" << callback << ", count=" << count
647           << ", state=" << state_;
648   DestructorGuard dg(this);
649   unique_ptr<IOBuf>ioBuf(std::move(buf));
650   assert(eventBase_->isInEventBaseThread());
651
652   if (shutdownFlags_ & (SHUT_WRITE | SHUT_WRITE_PENDING)) {
653     // No new writes may be performed after the write side of the socket has
654     // been shutdown.
655     //
656     // We could just call callback->writeError() here to fail just this write.
657     // However, fail hard and use invalidState() to fail all outstanding
658     // callbacks and move the socket into the error state.  There's most likely
659     // a bug in the caller's code, so we abort everything rather than trying to
660     // proceed as best we can.
661     return invalidState(callback);
662   }
663
664   uint32_t countWritten = 0;
665   uint32_t partialWritten = 0;
666   int bytesWritten = 0;
667   bool mustRegister = false;
668   if (state_ == StateEnum::ESTABLISHED && !connecting()) {
669     if (writeReqHead_ == nullptr) {
670       // If we are established and there are no other writes pending,
671       // we can attempt to perform the write immediately.
672       assert(writeReqTail_ == nullptr);
673       assert((eventFlags_ & EventHandler::WRITE) == 0);
674
675       bytesWritten = performWrite(vec, count, flags,
676                                   &countWritten, &partialWritten);
677       if (bytesWritten < 0) {
678         AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
679                                withAddr("writev failed"), errno);
680         return failWrite(__func__, callback, 0, ex);
681       } else if (countWritten == count) {
682         // We successfully wrote everything.
683         // Invoke the callback and return.
684         if (callback) {
685           callback->writeSuccess();
686         }
687         return;
688       } // else { continue writing the next writeReq }
689       mustRegister = true;
690     }
691   } else if (!connecting()) {
692     // Invalid state for writing
693     return invalidState(callback);
694   }
695
696   // Create a new WriteRequest to add to the queue
697   WriteRequest* req;
698   try {
699     req = BytesWriteRequest::newRequest(this, callback, vec + countWritten,
700                                         count - countWritten, partialWritten,
701                                         bytesWritten, std::move(ioBuf), flags);
702   } catch (const std::exception& ex) {
703     // we mainly expect to catch std::bad_alloc here
704     AsyncSocketException tex(AsyncSocketException::INTERNAL_ERROR,
705         withAddr(string("failed to append new WriteRequest: ") + ex.what()));
706     return failWrite(__func__, callback, bytesWritten, tex);
707   }
708   req->consume();
709   if (writeReqTail_ == nullptr) {
710     assert(writeReqHead_ == nullptr);
711     writeReqHead_ = writeReqTail_ = req;
712   } else {
713     writeReqTail_->append(req);
714     writeReqTail_ = req;
715   }
716
717   // Register for write events if are established and not currently
718   // waiting on write events
719   if (mustRegister) {
720     assert(state_ == StateEnum::ESTABLISHED);
721     assert((eventFlags_ & EventHandler::WRITE) == 0);
722     if (!updateEventRegistration(EventHandler::WRITE, 0)) {
723       assert(state_ == StateEnum::ERROR);
724       return;
725     }
726     if (sendTimeout_ > 0) {
727       // Schedule a timeout to fire if the write takes too long.
728       if (!writeTimeout_.scheduleTimeout(sendTimeout_)) {
729         AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
730                                withAddr("failed to schedule send timeout"));
731         return failWrite(__func__, ex);
732       }
733     }
734   }
735 }
736
737 void AsyncSocket::writeRequest(WriteRequest* req) {
738   if (writeReqTail_ == nullptr) {
739     assert(writeReqHead_ == nullptr);
740     writeReqHead_ = writeReqTail_ = req;
741     req->start();
742   } else {
743     writeReqTail_->append(req);
744     writeReqTail_ = req;
745   }
746 }
747
748 void AsyncSocket::close() {
749   VLOG(5) << "AsyncSocket::close(): this=" << this << ", fd_=" << fd_
750           << ", state=" << state_ << ", shutdownFlags="
751           << std::hex << (int) shutdownFlags_;
752
753   // close() is only different from closeNow() when there are pending writes
754   // that need to drain before we can close.  In all other cases, just call
755   // closeNow().
756   //
757   // Note that writeReqHead_ can be non-nullptr even in STATE_CLOSED or
758   // STATE_ERROR if close() is invoked while a previous closeNow() or failure
759   // is still running.  (e.g., If there are multiple pending writes, and we
760   // call writeError() on the first one, it may call close().  In this case we
761   // will already be in STATE_CLOSED or STATE_ERROR, but the remaining pending
762   // writes will still be in the queue.)
763   //
764   // We only need to drain pending writes if we are still in STATE_CONNECTING
765   // or STATE_ESTABLISHED
766   if ((writeReqHead_ == nullptr) ||
767       !(state_ == StateEnum::CONNECTING ||
768       state_ == StateEnum::ESTABLISHED)) {
769     closeNow();
770     return;
771   }
772
773   // Declare a DestructorGuard to ensure that the AsyncSocket cannot be
774   // destroyed until close() returns.
775   DestructorGuard dg(this);
776   assert(eventBase_->isInEventBaseThread());
777
778   // Since there are write requests pending, we have to set the
779   // SHUT_WRITE_PENDING flag, and wait to perform the real close until the
780   // connect finishes and we finish writing these requests.
781   //
782   // Set SHUT_READ to indicate that reads are shut down, and set the
783   // SHUT_WRITE_PENDING flag to mark that we want to shutdown once the
784   // pending writes complete.
785   shutdownFlags_ |= (SHUT_READ | SHUT_WRITE_PENDING);
786
787   // If a read callback is set, invoke readEOF() immediately to inform it that
788   // the socket has been closed and no more data can be read.
789   if (readCallback_) {
790     // Disable reads if they are enabled
791     if (!updateEventRegistration(0, EventHandler::READ)) {
792       // We're now in the error state; callbacks have been cleaned up
793       assert(state_ == StateEnum::ERROR);
794       assert(readCallback_ == nullptr);
795     } else {
796       ReadCallback* callback = readCallback_;
797       readCallback_ = nullptr;
798       callback->readEOF();
799     }
800   }
801 }
802
803 void AsyncSocket::closeNow() {
804   VLOG(5) << "AsyncSocket::closeNow(): this=" << this << ", fd_=" << fd_
805           << ", state=" << state_ << ", shutdownFlags="
806           << std::hex << (int) shutdownFlags_;
807   DestructorGuard dg(this);
808   assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
809
810   switch (state_) {
811     case StateEnum::ESTABLISHED:
812     case StateEnum::CONNECTING:
813     {
814       shutdownFlags_ |= (SHUT_READ | SHUT_WRITE);
815       state_ = StateEnum::CLOSED;
816
817       // If the write timeout was set, cancel it.
818       writeTimeout_.cancelTimeout();
819
820       // If we are registered for I/O events, unregister.
821       if (eventFlags_ != EventHandler::NONE) {
822         eventFlags_ = EventHandler::NONE;
823         if (!updateEventRegistration()) {
824           // We will have been moved into the error state.
825           assert(state_ == StateEnum::ERROR);
826           return;
827         }
828       }
829
830       if (immediateReadHandler_.isLoopCallbackScheduled()) {
831         immediateReadHandler_.cancelLoopCallback();
832       }
833
834       if (fd_ >= 0) {
835         ioHandler_.changeHandlerFD(-1);
836         doClose();
837       }
838
839       if (connectCallback_) {
840         ConnectCallback* callback = connectCallback_;
841         connectCallback_ = nullptr;
842         callback->connectErr(socketClosedLocallyEx);
843       }
844
845       failAllWrites(socketClosedLocallyEx);
846
847       if (readCallback_) {
848         ReadCallback* callback = readCallback_;
849         readCallback_ = nullptr;
850         callback->readEOF();
851       }
852       return;
853     }
854     case StateEnum::CLOSED:
855       // Do nothing.  It's possible that we are being called recursively
856       // from inside a callback that we invoked inside another call to close()
857       // that is still running.
858       return;
859     case StateEnum::ERROR:
860       // Do nothing.  The error handling code has performed (or is performing)
861       // cleanup.
862       return;
863     case StateEnum::UNINIT:
864       assert(eventFlags_ == EventHandler::NONE);
865       assert(connectCallback_ == nullptr);
866       assert(readCallback_ == nullptr);
867       assert(writeReqHead_ == nullptr);
868       shutdownFlags_ |= (SHUT_READ | SHUT_WRITE);
869       state_ = StateEnum::CLOSED;
870       return;
871   }
872
873   LOG(DFATAL) << "AsyncSocket::closeNow() (this=" << this << ", fd=" << fd_
874               << ") called in unknown state " << state_;
875 }
876
877 void AsyncSocket::closeWithReset() {
878   // Enable SO_LINGER, with the linger timeout set to 0.
879   // This will trigger a TCP reset when we close the socket.
880   if (fd_ >= 0) {
881     struct linger optLinger = {1, 0};
882     if (setSockOpt(SOL_SOCKET, SO_LINGER, &optLinger) != 0) {
883       VLOG(2) << "AsyncSocket::closeWithReset(): error setting SO_LINGER "
884               << "on " << fd_ << ": errno=" << errno;
885     }
886   }
887
888   // Then let closeNow() take care of the rest
889   closeNow();
890 }
891
892 void AsyncSocket::shutdownWrite() {
893   VLOG(5) << "AsyncSocket::shutdownWrite(): this=" << this << ", fd=" << fd_
894           << ", state=" << state_ << ", shutdownFlags="
895           << std::hex << (int) shutdownFlags_;
896
897   // If there are no pending writes, shutdownWrite() is identical to
898   // shutdownWriteNow().
899   if (writeReqHead_ == nullptr) {
900     shutdownWriteNow();
901     return;
902   }
903
904   assert(eventBase_->isInEventBaseThread());
905
906   // There are pending writes.  Set SHUT_WRITE_PENDING so that the actual
907   // shutdown will be performed once all writes complete.
908   shutdownFlags_ |= SHUT_WRITE_PENDING;
909 }
910
911 void AsyncSocket::shutdownWriteNow() {
912   VLOG(5) << "AsyncSocket::shutdownWriteNow(): this=" << this
913           << ", fd=" << fd_ << ", state=" << state_
914           << ", shutdownFlags=" << std::hex << (int) shutdownFlags_;
915
916   if (shutdownFlags_ & SHUT_WRITE) {
917     // Writes are already shutdown; nothing else to do.
918     return;
919   }
920
921   // If SHUT_READ is already set, just call closeNow() to completely
922   // close the socket.  This can happen if close() was called with writes
923   // pending, and then shutdownWriteNow() is called before all pending writes
924   // complete.
925   if (shutdownFlags_ & SHUT_READ) {
926     closeNow();
927     return;
928   }
929
930   DestructorGuard dg(this);
931   assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
932
933   switch (static_cast<StateEnum>(state_)) {
934     case StateEnum::ESTABLISHED:
935     {
936       shutdownFlags_ |= SHUT_WRITE;
937
938       // If the write timeout was set, cancel it.
939       writeTimeout_.cancelTimeout();
940
941       // If we are registered for write events, unregister.
942       if (!updateEventRegistration(0, EventHandler::WRITE)) {
943         // We will have been moved into the error state.
944         assert(state_ == StateEnum::ERROR);
945         return;
946       }
947
948       // Shutdown writes on the file descriptor
949       ::shutdown(fd_, SHUT_WR);
950
951       // Immediately fail all write requests
952       failAllWrites(socketShutdownForWritesEx);
953       return;
954     }
955     case StateEnum::CONNECTING:
956     {
957       // Set the SHUT_WRITE_PENDING flag.
958       // When the connection completes, it will check this flag,
959       // shutdown the write half of the socket, and then set SHUT_WRITE.
960       shutdownFlags_ |= SHUT_WRITE_PENDING;
961
962       // Immediately fail all write requests
963       failAllWrites(socketShutdownForWritesEx);
964       return;
965     }
966     case StateEnum::UNINIT:
967       // Callers normally shouldn't call shutdownWriteNow() before the socket
968       // even starts connecting.  Nonetheless, go ahead and set
969       // SHUT_WRITE_PENDING.  Once the socket eventually connects it will
970       // immediately shut down the write side of the socket.
971       shutdownFlags_ |= SHUT_WRITE_PENDING;
972       return;
973     case StateEnum::CLOSED:
974     case StateEnum::ERROR:
975       // We should never get here.  SHUT_WRITE should always be set
976       // in STATE_CLOSED and STATE_ERROR.
977       VLOG(4) << "AsyncSocket::shutdownWriteNow() (this=" << this
978                  << ", fd=" << fd_ << ") in unexpected state " << state_
979                  << " with SHUT_WRITE not set ("
980                  << std::hex << (int) shutdownFlags_ << ")";
981       assert(false);
982       return;
983   }
984
985   LOG(DFATAL) << "AsyncSocket::shutdownWriteNow() (this=" << this << ", fd="
986               << fd_ << ") called in unknown state " << state_;
987 }
988
989 bool AsyncSocket::readable() const {
990   if (fd_ == -1) {
991     return false;
992   }
993   struct pollfd fds[1];
994   fds[0].fd = fd_;
995   fds[0].events = POLLIN;
996   fds[0].revents = 0;
997   int rc = poll(fds, 1, 0);
998   return rc == 1;
999 }
1000
1001 bool AsyncSocket::isPending() const {
1002   return ioHandler_.isPending();
1003 }
1004
1005 bool AsyncSocket::hangup() const {
1006   if (fd_ == -1) {
1007     // sanity check, no one should ask for hangup if we are not connected.
1008     assert(false);
1009     return false;
1010   }
1011 #ifdef POLLRDHUP // Linux-only
1012   struct pollfd fds[1];
1013   fds[0].fd = fd_;
1014   fds[0].events = POLLRDHUP|POLLHUP;
1015   fds[0].revents = 0;
1016   poll(fds, 1, 0);
1017   return (fds[0].revents & (POLLRDHUP|POLLHUP)) != 0;
1018 #else
1019   return false;
1020 #endif
1021 }
1022
1023 bool AsyncSocket::good() const {
1024   return ((state_ == StateEnum::CONNECTING ||
1025           state_ == StateEnum::ESTABLISHED) &&
1026           (shutdownFlags_ == 0) && (eventBase_ != nullptr));
1027 }
1028
1029 bool AsyncSocket::error() const {
1030   return (state_ == StateEnum::ERROR);
1031 }
1032
1033 void AsyncSocket::attachEventBase(EventBase* eventBase) {
1034   VLOG(5) << "AsyncSocket::attachEventBase(this=" << this << ", fd=" << fd_
1035           << ", old evb=" << eventBase_ << ", new evb=" << eventBase
1036           << ", state=" << state_ << ", events="
1037           << std::hex << eventFlags_ << ")";
1038   assert(eventBase_ == nullptr);
1039   assert(eventBase->isInEventBaseThread());
1040
1041   eventBase_ = eventBase;
1042   ioHandler_.attachEventBase(eventBase);
1043   writeTimeout_.attachEventBase(eventBase);
1044 }
1045
1046 void AsyncSocket::detachEventBase() {
1047   VLOG(5) << "AsyncSocket::detachEventBase(this=" << this << ", fd=" << fd_
1048           << ", old evb=" << eventBase_ << ", state=" << state_
1049           << ", events=" << std::hex << eventFlags_ << ")";
1050   assert(eventBase_ != nullptr);
1051   assert(eventBase_->isInEventBaseThread());
1052
1053   eventBase_ = nullptr;
1054   ioHandler_.detachEventBase();
1055   writeTimeout_.detachEventBase();
1056 }
1057
1058 bool AsyncSocket::isDetachable() const {
1059   DCHECK(eventBase_ != nullptr);
1060   DCHECK(eventBase_->isInEventBaseThread());
1061
1062   return !ioHandler_.isHandlerRegistered() && !writeTimeout_.isScheduled();
1063 }
1064
1065 void AsyncSocket::getLocalAddress(folly::SocketAddress* address) const {
1066   address->setFromLocalAddress(fd_);
1067 }
1068
1069 void AsyncSocket::getPeerAddress(folly::SocketAddress* address) const {
1070   if (!addr_.isInitialized()) {
1071     addr_.setFromPeerAddress(fd_);
1072   }
1073   *address = addr_;
1074 }
1075
1076 int AsyncSocket::setNoDelay(bool noDelay) {
1077   if (fd_ < 0) {
1078     VLOG(4) << "AsyncSocket::setNoDelay() called on non-open socket "
1079                << this << "(state=" << state_ << ")";
1080     return EINVAL;
1081
1082   }
1083
1084   int value = noDelay ? 1 : 0;
1085   if (setsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &value, sizeof(value)) != 0) {
1086     int errnoCopy = errno;
1087     VLOG(2) << "failed to update TCP_NODELAY option on AsyncSocket "
1088             << this << " (fd=" << fd_ << ", state=" << state_ << "): "
1089             << strerror(errnoCopy);
1090     return errnoCopy;
1091   }
1092
1093   return 0;
1094 }
1095
1096 int AsyncSocket::setCongestionFlavor(const std::string &cname) {
1097
1098   #ifndef TCP_CONGESTION
1099   #define TCP_CONGESTION  13
1100   #endif
1101
1102   if (fd_ < 0) {
1103     VLOG(4) << "AsyncSocket::setCongestionFlavor() called on non-open "
1104                << "socket " << this << "(state=" << state_ << ")";
1105     return EINVAL;
1106
1107   }
1108
1109   if (setsockopt(fd_, IPPROTO_TCP, TCP_CONGESTION, cname.c_str(),
1110         cname.length() + 1) != 0) {
1111     int errnoCopy = errno;
1112     VLOG(2) << "failed to update TCP_CONGESTION option on AsyncSocket "
1113             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1114             << strerror(errnoCopy);
1115     return errnoCopy;
1116   }
1117
1118   return 0;
1119 }
1120
1121 int AsyncSocket::setQuickAck(bool quickack) {
1122   if (fd_ < 0) {
1123     VLOG(4) << "AsyncSocket::setQuickAck() called on non-open socket "
1124                << this << "(state=" << state_ << ")";
1125     return EINVAL;
1126
1127   }
1128
1129 #ifdef TCP_QUICKACK // Linux-only
1130   int value = quickack ? 1 : 0;
1131   if (setsockopt(fd_, IPPROTO_TCP, TCP_QUICKACK, &value, sizeof(value)) != 0) {
1132     int errnoCopy = errno;
1133     VLOG(2) << "failed to update TCP_QUICKACK option on AsyncSocket"
1134             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1135             << strerror(errnoCopy);
1136     return errnoCopy;
1137   }
1138
1139   return 0;
1140 #else
1141   return ENOSYS;
1142 #endif
1143 }
1144
1145 int AsyncSocket::setSendBufSize(size_t bufsize) {
1146   if (fd_ < 0) {
1147     VLOG(4) << "AsyncSocket::setSendBufSize() called on non-open socket "
1148                << this << "(state=" << state_ << ")";
1149     return EINVAL;
1150   }
1151
1152   if (setsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &bufsize, sizeof(bufsize)) !=0) {
1153     int errnoCopy = errno;
1154     VLOG(2) << "failed to update SO_SNDBUF option on AsyncSocket"
1155             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1156             << strerror(errnoCopy);
1157     return errnoCopy;
1158   }
1159
1160   return 0;
1161 }
1162
1163 int AsyncSocket::setRecvBufSize(size_t bufsize) {
1164   if (fd_ < 0) {
1165     VLOG(4) << "AsyncSocket::setRecvBufSize() called on non-open socket "
1166                << this << "(state=" << state_ << ")";
1167     return EINVAL;
1168   }
1169
1170   if (setsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &bufsize, sizeof(bufsize)) !=0) {
1171     int errnoCopy = errno;
1172     VLOG(2) << "failed to update SO_RCVBUF option on AsyncSocket"
1173             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1174             << strerror(errnoCopy);
1175     return errnoCopy;
1176   }
1177
1178   return 0;
1179 }
1180
1181 int AsyncSocket::setTCPProfile(int profd) {
1182   if (fd_ < 0) {
1183     VLOG(4) << "AsyncSocket::setTCPProfile() called on non-open socket "
1184                << this << "(state=" << state_ << ")";
1185     return EINVAL;
1186   }
1187
1188   if (setsockopt(fd_, SOL_SOCKET, SO_SET_NAMESPACE, &profd, sizeof(int)) !=0) {
1189     int errnoCopy = errno;
1190     VLOG(2) << "failed to set socket namespace option on AsyncSocket"
1191             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1192             << strerror(errnoCopy);
1193     return errnoCopy;
1194   }
1195
1196   return 0;
1197 }
1198
1199 void AsyncSocket::ioReady(uint16_t events) noexcept {
1200   VLOG(7) << "AsyncSocket::ioRead() this=" << this << ", fd" << fd_
1201           << ", events=" << std::hex << events << ", state=" << state_;
1202   DestructorGuard dg(this);
1203   assert(events & EventHandler::READ_WRITE);
1204   assert(eventBase_->isInEventBaseThread());
1205
1206   uint16_t relevantEvents = events & EventHandler::READ_WRITE;
1207   if (relevantEvents == EventHandler::READ) {
1208     handleRead();
1209   } else if (relevantEvents == EventHandler::WRITE) {
1210     handleWrite();
1211   } else if (relevantEvents == EventHandler::READ_WRITE) {
1212     EventBase* originalEventBase = eventBase_;
1213     // If both read and write events are ready, process writes first.
1214     handleWrite();
1215
1216     // Return now if handleWrite() detached us from our EventBase
1217     if (eventBase_ != originalEventBase) {
1218       return;
1219     }
1220
1221     // Only call handleRead() if a read callback is still installed.
1222     // (It's possible that the read callback was uninstalled during
1223     // handleWrite().)
1224     if (readCallback_) {
1225       handleRead();
1226     }
1227   } else {
1228     VLOG(4) << "AsyncSocket::ioRead() called with unexpected events "
1229                << std::hex << events << "(this=" << this << ")";
1230     abort();
1231   }
1232 }
1233
1234 ssize_t AsyncSocket::performRead(void** buf, size_t* buflen, size_t* offset) {
1235   VLOG(5) << "AsyncSocket::performRead() this=" << this
1236           << ", buf=" << *buf << ", buflen=" << *buflen;
1237
1238   ssize_t bytes = recv(fd_, *buf, *buflen, MSG_DONTWAIT);
1239   if (bytes < 0) {
1240     if (errno == EAGAIN || errno == EWOULDBLOCK) {
1241       // No more data to read right now.
1242       return READ_BLOCKING;
1243     } else {
1244       return READ_ERROR;
1245     }
1246   } else {
1247     appBytesReceived_ += bytes;
1248     return bytes;
1249   }
1250 }
1251
1252 void AsyncSocket::prepareReadBuffer(void** buf, size_t* buflen) noexcept {
1253   // no matter what, buffer should be preapared for non-ssl socket
1254   CHECK(readCallback_);
1255   readCallback_->getReadBuffer(buf, buflen);
1256 }
1257
1258 void AsyncSocket::handleRead() noexcept {
1259   VLOG(5) << "AsyncSocket::handleRead() this=" << this << ", fd=" << fd_
1260           << ", state=" << state_;
1261   assert(state_ == StateEnum::ESTABLISHED);
1262   assert((shutdownFlags_ & SHUT_READ) == 0);
1263   assert(readCallback_ != nullptr);
1264   assert(eventFlags_ & EventHandler::READ);
1265
1266   // Loop until:
1267   // - a read attempt would block
1268   // - readCallback_ is uninstalled
1269   // - the number of loop iterations exceeds the optional maximum
1270   // - this AsyncSocket is moved to another EventBase
1271   //
1272   // When we invoke readDataAvailable() it may uninstall the readCallback_,
1273   // which is why need to check for it here.
1274   //
1275   // The last bullet point is slightly subtle.  readDataAvailable() may also
1276   // detach this socket from this EventBase.  However, before
1277   // readDataAvailable() returns another thread may pick it up, attach it to
1278   // a different EventBase, and install another readCallback_.  We need to
1279   // exit immediately after readDataAvailable() returns if the eventBase_ has
1280   // changed.  (The caller must perform some sort of locking to transfer the
1281   // AsyncSocket between threads properly.  This will be sufficient to ensure
1282   // that this thread sees the updated eventBase_ variable after
1283   // readDataAvailable() returns.)
1284   uint16_t numReads = 0;
1285   EventBase* originalEventBase = eventBase_;
1286   while (readCallback_ && eventBase_ == originalEventBase) {
1287     // Get the buffer to read into.
1288     void* buf = nullptr;
1289     size_t buflen = 0, offset = 0;
1290     try {
1291       prepareReadBuffer(&buf, &buflen);
1292       VLOG(5) << "prepareReadBuffer() buf=" << buf << ", buflen=" << buflen;
1293     } catch (const AsyncSocketException& ex) {
1294       return failRead(__func__, ex);
1295     } catch (const std::exception& ex) {
1296       AsyncSocketException tex(AsyncSocketException::BAD_ARGS,
1297                               string("ReadCallback::getReadBuffer() "
1298                                      "threw exception: ") +
1299                               ex.what());
1300       return failRead(__func__, tex);
1301     } catch (...) {
1302       AsyncSocketException ex(AsyncSocketException::BAD_ARGS,
1303                              "ReadCallback::getReadBuffer() threw "
1304                              "non-exception type");
1305       return failRead(__func__, ex);
1306     }
1307     if (!isBufferMovable_ && (buf == nullptr || buflen == 0)) {
1308       AsyncSocketException ex(AsyncSocketException::BAD_ARGS,
1309                              "ReadCallback::getReadBuffer() returned "
1310                              "empty buffer");
1311       return failRead(__func__, ex);
1312     }
1313
1314     // Perform the read
1315     ssize_t bytesRead = performRead(&buf, &buflen, &offset);
1316     VLOG(4) << "this=" << this << ", AsyncSocket::handleRead() got "
1317             << bytesRead << " bytes";
1318     if (bytesRead > 0) {
1319       if (!isBufferMovable_) {
1320         readCallback_->readDataAvailable(bytesRead);
1321       } else {
1322         CHECK(kOpenSslModeMoveBufferOwnership);
1323         VLOG(5) << "this=" << this << ", AsyncSocket::handleRead() got "
1324                 << "buf=" << buf << ", " << bytesRead << "/" << buflen
1325                 << ", offset=" << offset;
1326         auto readBuf = folly::IOBuf::takeOwnership(buf, buflen);
1327         readBuf->trimStart(offset);
1328         readBuf->trimEnd(buflen - offset - bytesRead);
1329         readCallback_->readBufferAvailable(std::move(readBuf));
1330       }
1331
1332       // Fall through and continue around the loop if the read
1333       // completely filled the available buffer.
1334       // Note that readCallback_ may have been uninstalled or changed inside
1335       // readDataAvailable().
1336       if (size_t(bytesRead) < buflen) {
1337         return;
1338       }
1339     } else if (bytesRead == READ_BLOCKING) {
1340         // No more data to read right now.
1341         return;
1342     } else if (bytesRead == READ_ERROR) {
1343       AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1344                              withAddr("recv() failed"), errno);
1345       return failRead(__func__, ex);
1346     } else {
1347       assert(bytesRead == READ_EOF);
1348       // EOF
1349       shutdownFlags_ |= SHUT_READ;
1350       if (!updateEventRegistration(0, EventHandler::READ)) {
1351         // we've already been moved into STATE_ERROR
1352         assert(state_ == StateEnum::ERROR);
1353         assert(readCallback_ == nullptr);
1354         return;
1355       }
1356
1357       ReadCallback* callback = readCallback_;
1358       readCallback_ = nullptr;
1359       callback->readEOF();
1360       return;
1361     }
1362     if (maxReadsPerEvent_ && (++numReads >= maxReadsPerEvent_)) {
1363       if (readCallback_ != nullptr) {
1364         // We might still have data in the socket.
1365         // (e.g. see comment in AsyncSSLSocket::checkForImmediateRead)
1366         scheduleImmediateRead();
1367       }
1368       return;
1369     }
1370   }
1371 }
1372
1373 /**
1374  * This function attempts to write as much data as possible, until no more data
1375  * can be written.
1376  *
1377  * - If it sends all available data, it unregisters for write events, and stops
1378  *   the writeTimeout_.
1379  *
1380  * - If not all of the data can be sent immediately, it reschedules
1381  *   writeTimeout_ (if a non-zero timeout is set), and ensures the handler is
1382  *   registered for write events.
1383  */
1384 void AsyncSocket::handleWrite() noexcept {
1385   VLOG(5) << "AsyncSocket::handleWrite() this=" << this << ", fd=" << fd_
1386           << ", state=" << state_;
1387   if (state_ == StateEnum::CONNECTING) {
1388     handleConnect();
1389     return;
1390   }
1391
1392   // Normal write
1393   assert(state_ == StateEnum::ESTABLISHED);
1394   assert((shutdownFlags_ & SHUT_WRITE) == 0);
1395   assert(writeReqHead_ != nullptr);
1396
1397   // Loop until we run out of write requests,
1398   // or until this socket is moved to another EventBase.
1399   // (See the comment in handleRead() explaining how this can happen.)
1400   EventBase* originalEventBase = eventBase_;
1401   while (writeReqHead_ != nullptr && eventBase_ == originalEventBase) {
1402     if (!writeReqHead_->performWrite()) {
1403       AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1404                              withAddr("writev() failed"), errno);
1405       return failWrite(__func__, ex);
1406     } else if (writeReqHead_->isComplete()) {
1407       // We finished this request
1408       WriteRequest* req = writeReqHead_;
1409       writeReqHead_ = req->getNext();
1410
1411       if (writeReqHead_ == nullptr) {
1412         writeReqTail_ = nullptr;
1413         // This is the last write request.
1414         // Unregister for write events and cancel the send timer
1415         // before we invoke the callback.  We have to update the state properly
1416         // before calling the callback, since it may want to detach us from
1417         // the EventBase.
1418         if (eventFlags_ & EventHandler::WRITE) {
1419           if (!updateEventRegistration(0, EventHandler::WRITE)) {
1420             assert(state_ == StateEnum::ERROR);
1421             return;
1422           }
1423           // Stop the send timeout
1424           writeTimeout_.cancelTimeout();
1425         }
1426         assert(!writeTimeout_.isScheduled());
1427
1428         // If SHUT_WRITE_PENDING is set, we should shutdown the socket after
1429         // we finish sending the last write request.
1430         //
1431         // We have to do this before invoking writeSuccess(), since
1432         // writeSuccess() may detach us from our EventBase.
1433         if (shutdownFlags_ & SHUT_WRITE_PENDING) {
1434           assert(connectCallback_ == nullptr);
1435           shutdownFlags_ |= SHUT_WRITE;
1436
1437           if (shutdownFlags_ & SHUT_READ) {
1438             // Reads have already been shutdown.  Fully close the socket and
1439             // move to STATE_CLOSED.
1440             //
1441             // Note: This code currently moves us to STATE_CLOSED even if
1442             // close() hasn't ever been called.  This can occur if we have
1443             // received EOF from the peer and shutdownWrite() has been called
1444             // locally.  Should we bother staying in STATE_ESTABLISHED in this
1445             // case, until close() is actually called?  I can't think of a
1446             // reason why we would need to do so.  No other operations besides
1447             // calling close() or destroying the socket can be performed at
1448             // this point.
1449             assert(readCallback_ == nullptr);
1450             state_ = StateEnum::CLOSED;
1451             if (fd_ >= 0) {
1452               ioHandler_.changeHandlerFD(-1);
1453               doClose();
1454             }
1455           } else {
1456             // Reads are still enabled, so we are only doing a half-shutdown
1457             ::shutdown(fd_, SHUT_WR);
1458           }
1459         }
1460       }
1461
1462       // Invoke the callback
1463       WriteCallback* callback = req->getCallback();
1464       req->destroy();
1465       if (callback) {
1466         callback->writeSuccess();
1467       }
1468       // We'll continue around the loop, trying to write another request
1469     } else {
1470       // Partial write.
1471       writeReqHead_->consume();
1472       // Stop after a partial write; it's highly likely that a subsequent write
1473       // attempt will just return EAGAIN.
1474       //
1475       // Ensure that we are registered for write events.
1476       if ((eventFlags_ & EventHandler::WRITE) == 0) {
1477         if (!updateEventRegistration(EventHandler::WRITE, 0)) {
1478           assert(state_ == StateEnum::ERROR);
1479           return;
1480         }
1481       }
1482
1483       // Reschedule the send timeout, since we have made some write progress.
1484       if (sendTimeout_ > 0) {
1485         if (!writeTimeout_.scheduleTimeout(sendTimeout_)) {
1486           AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1487               withAddr("failed to reschedule write timeout"));
1488           return failWrite(__func__, ex);
1489         }
1490       }
1491       return;
1492     }
1493   }
1494 }
1495
1496 void AsyncSocket::checkForImmediateRead() noexcept {
1497   // We currently don't attempt to perform optimistic reads in AsyncSocket.
1498   // (However, note that some subclasses do override this method.)
1499   //
1500   // Simply calling handleRead() here would be bad, as this would call
1501   // readCallback_->getReadBuffer(), forcing the callback to allocate a read
1502   // buffer even though no data may be available.  This would waste lots of
1503   // memory, since the buffer will sit around unused until the socket actually
1504   // becomes readable.
1505   //
1506   // Checking if the socket is readable now also seems like it would probably
1507   // be a pessimism.  In most cases it probably wouldn't be readable, and we
1508   // would just waste an extra system call.  Even if it is readable, waiting to
1509   // find out from libevent on the next event loop doesn't seem that bad.
1510 }
1511
1512 void AsyncSocket::handleInitialReadWrite() noexcept {
1513   // Our callers should already be holding a DestructorGuard, but grab
1514   // one here just to make sure, in case one of our calling code paths ever
1515   // changes.
1516   DestructorGuard dg(this);
1517
1518   // If we have a readCallback_, make sure we enable read events.  We
1519   // may already be registered for reads if connectSuccess() set
1520   // the read calback.
1521   if (readCallback_ && !(eventFlags_ & EventHandler::READ)) {
1522     assert(state_ == StateEnum::ESTABLISHED);
1523     assert((shutdownFlags_ & SHUT_READ) == 0);
1524     if (!updateEventRegistration(EventHandler::READ, 0)) {
1525       assert(state_ == StateEnum::ERROR);
1526       return;
1527     }
1528     checkForImmediateRead();
1529   } else if (readCallback_ == nullptr) {
1530     // Unregister for read events.
1531     updateEventRegistration(0, EventHandler::READ);
1532   }
1533
1534   // If we have write requests pending, try to send them immediately.
1535   // Since we just finished accepting, there is a very good chance that we can
1536   // write without blocking.
1537   //
1538   // However, we only process them if EventHandler::WRITE is not already set,
1539   // which means that we're already blocked on a write attempt.  (This can
1540   // happen if connectSuccess() called write() before returning.)
1541   if (writeReqHead_ && !(eventFlags_ & EventHandler::WRITE)) {
1542     // Call handleWrite() to perform write processing.
1543     handleWrite();
1544   } else if (writeReqHead_ == nullptr) {
1545     // Unregister for write event.
1546     updateEventRegistration(0, EventHandler::WRITE);
1547   }
1548 }
1549
1550 void AsyncSocket::handleConnect() noexcept {
1551   VLOG(5) << "AsyncSocket::handleConnect() this=" << this << ", fd=" << fd_
1552           << ", state=" << state_;
1553   assert(state_ == StateEnum::CONNECTING);
1554   // SHUT_WRITE can never be set while we are still connecting;
1555   // SHUT_WRITE_PENDING may be set, be we only set SHUT_WRITE once the connect
1556   // finishes
1557   assert((shutdownFlags_ & SHUT_WRITE) == 0);
1558
1559   // In case we had a connect timeout, cancel the timeout
1560   writeTimeout_.cancelTimeout();
1561   // We don't use a persistent registration when waiting on a connect event,
1562   // so we have been automatically unregistered now.  Update eventFlags_ to
1563   // reflect reality.
1564   assert(eventFlags_ == EventHandler::WRITE);
1565   eventFlags_ = EventHandler::NONE;
1566
1567   // Call getsockopt() to check if the connect succeeded
1568   int error;
1569   socklen_t len = sizeof(error);
1570   int rv = getsockopt(fd_, SOL_SOCKET, SO_ERROR, &error, &len);
1571   if (rv != 0) {
1572     AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1573                            withAddr("error calling getsockopt() after connect"),
1574                            errno);
1575     VLOG(4) << "AsyncSocket::handleConnect(this=" << this << ", fd="
1576                << fd_ << " host=" << addr_.describe()
1577                << ") exception:" << ex.what();
1578     return failConnect(__func__, ex);
1579   }
1580
1581   if (error != 0) {
1582     AsyncSocketException ex(AsyncSocketException::NOT_OPEN,
1583                            "connect failed", error);
1584     VLOG(1) << "AsyncSocket::handleConnect(this=" << this << ", fd="
1585             << fd_ << " host=" << addr_.describe()
1586             << ") exception: " << ex.what();
1587     return failConnect(__func__, ex);
1588   }
1589
1590   // Move into STATE_ESTABLISHED
1591   state_ = StateEnum::ESTABLISHED;
1592
1593   // If SHUT_WRITE_PENDING is set and we don't have any write requests to
1594   // perform, immediately shutdown the write half of the socket.
1595   if ((shutdownFlags_ & SHUT_WRITE_PENDING) && writeReqHead_ == nullptr) {
1596     // SHUT_READ shouldn't be set.  If close() is called on the socket while we
1597     // are still connecting we just abort the connect rather than waiting for
1598     // it to complete.
1599     assert((shutdownFlags_ & SHUT_READ) == 0);
1600     ::shutdown(fd_, SHUT_WR);
1601     shutdownFlags_ |= SHUT_WRITE;
1602   }
1603
1604   VLOG(7) << "AsyncSocket " << this << ": fd " << fd_
1605           << "successfully connected; state=" << state_;
1606
1607   // Remember the EventBase we are attached to, before we start invoking any
1608   // callbacks (since the callbacks may call detachEventBase()).
1609   EventBase* originalEventBase = eventBase_;
1610
1611   // Call the connect callback.
1612   if (connectCallback_) {
1613     ConnectCallback* callback = connectCallback_;
1614     connectCallback_ = nullptr;
1615     callback->connectSuccess();
1616   }
1617
1618   // Note that the connect callback may have changed our state.
1619   // (set or unset the read callback, called write(), closed the socket, etc.)
1620   // The following code needs to handle these situations correctly.
1621   //
1622   // If the socket has been closed, readCallback_ and writeReqHead_ will
1623   // always be nullptr, so that will prevent us from trying to read or write.
1624   //
1625   // The main thing to check for is if eventBase_ is still originalEventBase.
1626   // If not, we have been detached from this event base, so we shouldn't
1627   // perform any more operations.
1628   if (eventBase_ != originalEventBase) {
1629     return;
1630   }
1631
1632   handleInitialReadWrite();
1633 }
1634
1635 void AsyncSocket::timeoutExpired() noexcept {
1636   VLOG(7) << "AsyncSocket " << this << ", fd " << fd_ << ": timeout expired: "
1637           << "state=" << state_ << ", events=" << std::hex << eventFlags_;
1638   DestructorGuard dg(this);
1639   assert(eventBase_->isInEventBaseThread());
1640
1641   if (state_ == StateEnum::CONNECTING) {
1642     // connect() timed out
1643     // Unregister for I/O events.
1644     AsyncSocketException ex(AsyncSocketException::TIMED_OUT,
1645                            "connect timed out");
1646     failConnect(__func__, ex);
1647   } else {
1648     // a normal write operation timed out
1649     assert(state_ == StateEnum::ESTABLISHED);
1650     AsyncSocketException ex(AsyncSocketException::TIMED_OUT, "write timed out");
1651     failWrite(__func__, ex);
1652   }
1653 }
1654
1655 ssize_t AsyncSocket::performWrite(const iovec* vec,
1656                                    uint32_t count,
1657                                    WriteFlags flags,
1658                                    uint32_t* countWritten,
1659                                    uint32_t* partialWritten) {
1660   // We use sendmsg() instead of writev() so that we can pass in MSG_NOSIGNAL
1661   // We correctly handle EPIPE errors, so we never want to receive SIGPIPE
1662   // (since it may terminate the program if the main program doesn't explicitly
1663   // ignore it).
1664   struct msghdr msg;
1665   msg.msg_name = nullptr;
1666   msg.msg_namelen = 0;
1667   msg.msg_iov = const_cast<iovec *>(vec);
1668 #ifdef IOV_MAX // not defined on Android
1669   msg.msg_iovlen = std::min(count, (uint32_t)IOV_MAX);
1670 #else
1671   msg.msg_iovlen = std::min(count, (uint32_t)UIO_MAXIOV);
1672 #endif
1673   msg.msg_control = nullptr;
1674   msg.msg_controllen = 0;
1675   msg.msg_flags = 0;
1676
1677   int msg_flags = MSG_DONTWAIT;
1678
1679 #ifdef MSG_NOSIGNAL // Linux-only
1680   msg_flags |= MSG_NOSIGNAL;
1681   if (isSet(flags, WriteFlags::CORK)) {
1682     // MSG_MORE tells the kernel we have more data to send, so wait for us to
1683     // give it the rest of the data rather than immediately sending a partial
1684     // frame, even when TCP_NODELAY is enabled.
1685     msg_flags |= MSG_MORE;
1686   }
1687 #endif
1688   if (isSet(flags, WriteFlags::EOR)) {
1689     // marks that this is the last byte of a record (response)
1690     msg_flags |= MSG_EOR;
1691   }
1692   ssize_t totalWritten = ::sendmsg(fd_, &msg, msg_flags);
1693   if (totalWritten < 0) {
1694     if (errno == EAGAIN) {
1695       // TCP buffer is full; we can't write any more data right now.
1696       *countWritten = 0;
1697       *partialWritten = 0;
1698       return 0;
1699     }
1700     // error
1701     *countWritten = 0;
1702     *partialWritten = 0;
1703     return -1;
1704   }
1705
1706   appBytesWritten_ += totalWritten;
1707
1708   uint32_t bytesWritten;
1709   uint32_t n;
1710   for (bytesWritten = totalWritten, n = 0; n < count; ++n) {
1711     const iovec* v = vec + n;
1712     if (v->iov_len > bytesWritten) {
1713       // Partial write finished in the middle of this iovec
1714       *countWritten = n;
1715       *partialWritten = bytesWritten;
1716       return totalWritten;
1717     }
1718
1719     bytesWritten -= v->iov_len;
1720   }
1721
1722   assert(bytesWritten == 0);
1723   *countWritten = n;
1724   *partialWritten = 0;
1725   return totalWritten;
1726 }
1727
1728 /**
1729  * Re-register the EventHandler after eventFlags_ has changed.
1730  *
1731  * If an error occurs, fail() is called to move the socket into the error state
1732  * and call all currently installed callbacks.  After an error, the
1733  * AsyncSocket is completely unregistered.
1734  *
1735  * @return Returns true on succcess, or false on error.
1736  */
1737 bool AsyncSocket::updateEventRegistration() {
1738   VLOG(5) << "AsyncSocket::updateEventRegistration(this=" << this
1739           << ", fd=" << fd_ << ", evb=" << eventBase_ << ", state=" << state_
1740           << ", events=" << std::hex << eventFlags_;
1741   assert(eventBase_->isInEventBaseThread());
1742   if (eventFlags_ == EventHandler::NONE) {
1743     ioHandler_.unregisterHandler();
1744     return true;
1745   }
1746
1747   // Always register for persistent events, so we don't have to re-register
1748   // after being called back.
1749   if (!ioHandler_.registerHandler(eventFlags_ | EventHandler::PERSIST)) {
1750     eventFlags_ = EventHandler::NONE; // we're not registered after error
1751     AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1752         withAddr("failed to update AsyncSocket event registration"));
1753     fail("updateEventRegistration", ex);
1754     return false;
1755   }
1756
1757   return true;
1758 }
1759
1760 bool AsyncSocket::updateEventRegistration(uint16_t enable,
1761                                            uint16_t disable) {
1762   uint16_t oldFlags = eventFlags_;
1763   eventFlags_ |= enable;
1764   eventFlags_ &= ~disable;
1765   if (eventFlags_ == oldFlags) {
1766     return true;
1767   } else {
1768     return updateEventRegistration();
1769   }
1770 }
1771
1772 void AsyncSocket::startFail() {
1773   // startFail() should only be called once
1774   assert(state_ != StateEnum::ERROR);
1775   assert(getDestructorGuardCount() > 0);
1776   state_ = StateEnum::ERROR;
1777   // Ensure that SHUT_READ and SHUT_WRITE are set,
1778   // so all future attempts to read or write will be rejected
1779   shutdownFlags_ |= (SHUT_READ | SHUT_WRITE);
1780
1781   if (eventFlags_ != EventHandler::NONE) {
1782     eventFlags_ = EventHandler::NONE;
1783     ioHandler_.unregisterHandler();
1784   }
1785   writeTimeout_.cancelTimeout();
1786
1787   if (fd_ >= 0) {
1788     ioHandler_.changeHandlerFD(-1);
1789     doClose();
1790   }
1791 }
1792
1793 void AsyncSocket::finishFail() {
1794   assert(state_ == StateEnum::ERROR);
1795   assert(getDestructorGuardCount() > 0);
1796
1797   AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1798                          withAddr("socket closing after error"));
1799   if (connectCallback_) {
1800     ConnectCallback* callback = connectCallback_;
1801     connectCallback_ = nullptr;
1802     callback->connectErr(ex);
1803   }
1804
1805   failAllWrites(ex);
1806
1807   if (readCallback_) {
1808     ReadCallback* callback = readCallback_;
1809     readCallback_ = nullptr;
1810     callback->readErr(ex);
1811   }
1812 }
1813
1814 void AsyncSocket::fail(const char* fn, const AsyncSocketException& ex) {
1815   VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
1816              << state_ << " host=" << addr_.describe()
1817              << "): failed in " << fn << "(): "
1818              << ex.what();
1819   startFail();
1820   finishFail();
1821 }
1822
1823 void AsyncSocket::failConnect(const char* fn, const AsyncSocketException& ex) {
1824   VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
1825                << state_ << " host=" << addr_.describe()
1826                << "): failed while connecting in " << fn << "(): "
1827                << ex.what();
1828   startFail();
1829
1830   if (connectCallback_ != nullptr) {
1831     ConnectCallback* callback = connectCallback_;
1832     connectCallback_ = nullptr;
1833     callback->connectErr(ex);
1834   }
1835
1836   finishFail();
1837 }
1838
1839 void AsyncSocket::failRead(const char* fn, const AsyncSocketException& ex) {
1840   VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
1841                << state_ << " host=" << addr_.describe()
1842                << "): failed while reading in " << fn << "(): "
1843                << ex.what();
1844   startFail();
1845
1846   if (readCallback_ != nullptr) {
1847     ReadCallback* callback = readCallback_;
1848     readCallback_ = nullptr;
1849     callback->readErr(ex);
1850   }
1851
1852   finishFail();
1853 }
1854
1855 void AsyncSocket::failWrite(const char* fn, const AsyncSocketException& ex) {
1856   VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
1857                << state_ << " host=" << addr_.describe()
1858                << "): failed while writing in " << fn << "(): "
1859                << ex.what();
1860   startFail();
1861
1862   // Only invoke the first write callback, since the error occurred while
1863   // writing this request.  Let any other pending write callbacks be invoked in
1864   // finishFail().
1865   if (writeReqHead_ != nullptr) {
1866     WriteRequest* req = writeReqHead_;
1867     writeReqHead_ = req->getNext();
1868     WriteCallback* callback = req->getCallback();
1869     uint32_t bytesWritten = req->getTotalBytesWritten();
1870     req->destroy();
1871     if (callback) {
1872       callback->writeErr(bytesWritten, ex);
1873     }
1874   }
1875
1876   finishFail();
1877 }
1878
1879 void AsyncSocket::failWrite(const char* fn, WriteCallback* callback,
1880                              size_t bytesWritten,
1881                              const AsyncSocketException& ex) {
1882   // This version of failWrite() is used when the failure occurs before
1883   // we've added the callback to writeReqHead_.
1884   VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
1885              << state_ << " host=" << addr_.describe()
1886              <<"): failed while writing in " << fn << "(): "
1887              << ex.what();
1888   startFail();
1889
1890   if (callback != nullptr) {
1891     callback->writeErr(bytesWritten, ex);
1892   }
1893
1894   finishFail();
1895 }
1896
1897 void AsyncSocket::failAllWrites(const AsyncSocketException& ex) {
1898   // Invoke writeError() on all write callbacks.
1899   // This is used when writes are forcibly shutdown with write requests
1900   // pending, or when an error occurs with writes pending.
1901   while (writeReqHead_ != nullptr) {
1902     WriteRequest* req = writeReqHead_;
1903     writeReqHead_ = req->getNext();
1904     WriteCallback* callback = req->getCallback();
1905     if (callback) {
1906       callback->writeErr(req->getTotalBytesWritten(), ex);
1907     }
1908     req->destroy();
1909   }
1910 }
1911
1912 void AsyncSocket::invalidState(ConnectCallback* callback) {
1913   VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_
1914              << "): connect() called in invalid state " << state_;
1915
1916   /*
1917    * The invalidState() methods don't use the normal failure mechanisms,
1918    * since we don't know what state we are in.  We don't want to call
1919    * startFail()/finishFail() recursively if we are already in the middle of
1920    * cleaning up.
1921    */
1922
1923   AsyncSocketException ex(AsyncSocketException::ALREADY_OPEN,
1924                          "connect() called with socket in invalid state");
1925   if (state_ == StateEnum::CLOSED || state_ == StateEnum::ERROR) {
1926     if (callback) {
1927       callback->connectErr(ex);
1928     }
1929   } else {
1930     // We can't use failConnect() here since connectCallback_
1931     // may already be set to another callback.  Invoke this ConnectCallback
1932     // here; any other connectCallback_ will be invoked in finishFail()
1933     startFail();
1934     if (callback) {
1935       callback->connectErr(ex);
1936     }
1937     finishFail();
1938   }
1939 }
1940
1941 void AsyncSocket::invalidState(ReadCallback* callback) {
1942   VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_
1943              << "): setReadCallback(" << callback
1944              << ") called in invalid state " << state_;
1945
1946   AsyncSocketException ex(AsyncSocketException::NOT_OPEN,
1947                          "setReadCallback() called with socket in "
1948                          "invalid state");
1949   if (state_ == StateEnum::CLOSED || state_ == StateEnum::ERROR) {
1950     if (callback) {
1951       callback->readErr(ex);
1952     }
1953   } else {
1954     startFail();
1955     if (callback) {
1956       callback->readErr(ex);
1957     }
1958     finishFail();
1959   }
1960 }
1961
1962 void AsyncSocket::invalidState(WriteCallback* callback) {
1963   VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_
1964              << "): write() called in invalid state " << state_;
1965
1966   AsyncSocketException ex(AsyncSocketException::NOT_OPEN,
1967                          withAddr("write() called with socket in invalid state"));
1968   if (state_ == StateEnum::CLOSED || state_ == StateEnum::ERROR) {
1969     if (callback) {
1970       callback->writeErr(0, ex);
1971     }
1972   } else {
1973     startFail();
1974     if (callback) {
1975       callback->writeErr(0, ex);
1976     }
1977     finishFail();
1978   }
1979 }
1980
1981 void AsyncSocket::doClose() {
1982   if (fd_ == -1) return;
1983   if (shutdownSocketSet_) {
1984     shutdownSocketSet_->close(fd_);
1985   } else {
1986     ::close(fd_);
1987   }
1988   fd_ = -1;
1989 }
1990
1991 std::ostream& operator << (std::ostream& os,
1992                            const AsyncSocket::StateEnum& state) {
1993   os << static_cast<int>(state);
1994   return os;
1995 }
1996
1997 std::string AsyncSocket::withAddr(const std::string& s) {
1998   // Don't use addr_ directly because it may not be initialized
1999   // e.g. if constructed from fd
2000   folly::SocketAddress peer, local;
2001   try {
2002     getPeerAddress(&peer);
2003     getLocalAddress(&local);
2004   } catch (const std::exception&) {
2005     // ignore
2006   } catch (...) {
2007     // ignore
2008   }
2009   return s + " (peer=" + peer.describe() + ", local=" + local.describe() + ")";
2010 }
2011
2012 } // folly