Allow a AsyncSocket to be corked the whole time
[folly.git] / folly / io / async / AsyncSocket.cpp
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/io/async/AsyncSocket.h>
18
19 #include <folly/io/async/EventBase.h>
20 #include <folly/io/async/EventHandler.h>
21 #include <folly/SocketAddress.h>
22 #include <folly/io/IOBuf.h>
23
24 #include <poll.h>
25 #include <errno.h>
26 #include <limits.h>
27 #include <unistd.h>
28 #include <thread>
29 #include <fcntl.h>
30 #include <sys/types.h>
31 #include <sys/socket.h>
32 #include <netinet/in.h>
33 #include <netinet/tcp.h>
34 #include <boost/preprocessor/control/if.hpp>
35
36 using std::string;
37 using std::unique_ptr;
38
39 namespace folly {
40
41 // static members initializers
42 const AsyncSocket::OptionMap AsyncSocket::emptyOptionMap;
43
44 const AsyncSocketException socketClosedLocallyEx(
45     AsyncSocketException::END_OF_FILE, "socket closed locally");
46 const AsyncSocketException socketShutdownForWritesEx(
47     AsyncSocketException::END_OF_FILE, "socket shutdown for writes");
48
49 // TODO: It might help performance to provide a version of BytesWriteRequest that
50 // users could derive from, so we can avoid the extra allocation for each call
51 // to write()/writev().  We could templatize TFramedAsyncChannel just like the
52 // protocols are currently templatized for transports.
53 //
54 // We would need the version for external users where they provide the iovec
55 // storage space, and only our internal version would allocate it at the end of
56 // the WriteRequest.
57
58 /* The default WriteRequest implementation, used for write(), writev() and
59  * writeChain()
60  *
61  * A new BytesWriteRequest operation is allocated on the heap for all write
62  * operations that cannot be completed immediately.
63  */
64 class AsyncSocket::BytesWriteRequest : public AsyncSocket::WriteRequest {
65  public:
66   static BytesWriteRequest* newRequest(
67       AsyncSocket* socket,
68       WriteCallback* callback,
69       const iovec* ops,
70       uint32_t opCount,
71       uint32_t partialWritten,
72       uint32_t bytesWritten,
73       unique_ptr<IOBuf>&& ioBuf,
74       WriteFlags flags,
75       BufferCallback* bufferCallback = nullptr) {
76     assert(opCount > 0);
77     // Since we put a variable size iovec array at the end
78     // of each BytesWriteRequest, we have to manually allocate the memory.
79     void* buf = malloc(sizeof(BytesWriteRequest) +
80                        (opCount * sizeof(struct iovec)));
81     if (buf == nullptr) {
82       throw std::bad_alloc();
83     }
84
85     return new(buf) BytesWriteRequest(socket, callback, ops, opCount,
86                                       partialWritten, bytesWritten,
87                                       std::move(ioBuf), flags, bufferCallback);
88   }
89
90   void destroy() override {
91     this->~BytesWriteRequest();
92     free(this);
93   }
94
95   bool performWrite() override {
96     WriteFlags writeFlags = flags_;
97     if (getNext() != nullptr) {
98       writeFlags = writeFlags | WriteFlags::CORK;
99     }
100     bytesWritten_ = socket_->performWrite(getOps(), getOpCount(), writeFlags,
101                                           &opsWritten_, &partialBytes_);
102     return bytesWritten_ >= 0;
103   }
104
105   bool isComplete() override {
106     return opsWritten_ == getOpCount();
107   }
108
109   void consume() override {
110     // Advance opIndex_ forward by opsWritten_
111     opIndex_ += opsWritten_;
112     assert(opIndex_ < opCount_);
113
114     // If we've finished writing any IOBufs, release them
115     if (ioBuf_) {
116       for (uint32_t i = opsWritten_; i != 0; --i) {
117         assert(ioBuf_);
118         ioBuf_ = ioBuf_->pop();
119       }
120     }
121
122     // Move partialBytes_ forward into the current iovec buffer
123     struct iovec* currentOp = writeOps_ + opIndex_;
124     assert((partialBytes_ < currentOp->iov_len) || (currentOp->iov_len == 0));
125     currentOp->iov_base =
126       reinterpret_cast<uint8_t*>(currentOp->iov_base) + partialBytes_;
127     currentOp->iov_len -= partialBytes_;
128
129     // Increment the totalBytesWritten_ count by bytesWritten_;
130     totalBytesWritten_ += bytesWritten_;
131   }
132
133  private:
134   BytesWriteRequest(AsyncSocket* socket,
135                     WriteCallback* callback,
136                     const struct iovec* ops,
137                     uint32_t opCount,
138                     uint32_t partialBytes,
139                     uint32_t bytesWritten,
140                     unique_ptr<IOBuf>&& ioBuf,
141                     WriteFlags flags,
142                     BufferCallback* bufferCallback = nullptr)
143     : AsyncSocket::WriteRequest(socket, callback, bufferCallback)
144     , opCount_(opCount)
145     , opIndex_(0)
146     , flags_(flags)
147     , ioBuf_(std::move(ioBuf))
148     , opsWritten_(0)
149     , partialBytes_(partialBytes)
150     , bytesWritten_(bytesWritten) {
151     memcpy(writeOps_, ops, sizeof(*ops) * opCount_);
152   }
153
154   // private destructor, to ensure callers use destroy()
155   ~BytesWriteRequest() override = default;
156
157   const struct iovec* getOps() const {
158     assert(opCount_ > opIndex_);
159     return writeOps_ + opIndex_;
160   }
161
162   uint32_t getOpCount() const {
163     assert(opCount_ > opIndex_);
164     return opCount_ - opIndex_;
165   }
166
167   uint32_t opCount_;            ///< number of entries in writeOps_
168   uint32_t opIndex_;            ///< current index into writeOps_
169   WriteFlags flags_;            ///< set for WriteFlags
170   unique_ptr<IOBuf> ioBuf_;     ///< underlying IOBuf, or nullptr if N/A
171
172   // for consume(), how much we wrote on the last write
173   uint32_t opsWritten_;         ///< complete ops written
174   uint32_t partialBytes_;       ///< partial bytes of incomplete op written
175   ssize_t bytesWritten_;        ///< bytes written altogether
176
177   struct iovec writeOps_[];     ///< write operation(s) list
178 };
179
180 AsyncSocket::AsyncSocket()
181   : eventBase_(nullptr)
182   , writeTimeout_(this, nullptr)
183   , ioHandler_(this, nullptr)
184   , immediateReadHandler_(this) {
185   VLOG(5) << "new AsyncSocket()";
186   init();
187 }
188
189 AsyncSocket::AsyncSocket(EventBase* evb)
190   : eventBase_(evb)
191   , writeTimeout_(this, evb)
192   , ioHandler_(this, evb)
193   , immediateReadHandler_(this) {
194   VLOG(5) << "new AsyncSocket(" << this << ", evb=" << evb << ")";
195   init();
196 }
197
198 AsyncSocket::AsyncSocket(EventBase* evb,
199                            const folly::SocketAddress& address,
200                            uint32_t connectTimeout)
201   : AsyncSocket(evb) {
202   connect(nullptr, address, connectTimeout);
203 }
204
205 AsyncSocket::AsyncSocket(EventBase* evb,
206                            const std::string& ip,
207                            uint16_t port,
208                            uint32_t connectTimeout)
209   : AsyncSocket(evb) {
210   connect(nullptr, ip, port, connectTimeout);
211 }
212
213 AsyncSocket::AsyncSocket(EventBase* evb, int fd)
214   : eventBase_(evb)
215   , writeTimeout_(this, evb)
216   , ioHandler_(this, evb, fd)
217   , immediateReadHandler_(this) {
218   VLOG(5) << "new AsyncSocket(" << this << ", evb=" << evb << ", fd="
219           << fd << ")";
220   init();
221   fd_ = fd;
222   setCloseOnExec();
223   state_ = StateEnum::ESTABLISHED;
224 }
225
226 // init() method, since constructor forwarding isn't supported in most
227 // compilers yet.
228 void AsyncSocket::init() {
229   assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
230   shutdownFlags_ = 0;
231   state_ = StateEnum::UNINIT;
232   eventFlags_ = EventHandler::NONE;
233   fd_ = -1;
234   sendTimeout_ = 0;
235   maxReadsPerEvent_ = 16;
236   connectCallback_ = nullptr;
237   readCallback_ = nullptr;
238   writeReqHead_ = nullptr;
239   writeReqTail_ = nullptr;
240   shutdownSocketSet_ = nullptr;
241   appBytesWritten_ = 0;
242   appBytesReceived_ = 0;
243 }
244
245 AsyncSocket::~AsyncSocket() {
246   VLOG(7) << "actual destruction of AsyncSocket(this=" << this
247           << ", evb=" << eventBase_ << ", fd=" << fd_
248           << ", state=" << state_ << ")";
249 }
250
251 void AsyncSocket::destroy() {
252   VLOG(5) << "AsyncSocket::destroy(this=" << this << ", evb=" << eventBase_
253           << ", fd=" << fd_ << ", state=" << state_;
254   // When destroy is called, close the socket immediately
255   closeNow();
256
257   // Then call DelayedDestruction::destroy() to take care of
258   // whether or not we need immediate or delayed destruction
259   DelayedDestruction::destroy();
260 }
261
262 int AsyncSocket::detachFd() {
263   VLOG(6) << "AsyncSocket::detachFd(this=" << this << ", fd=" << fd_
264           << ", evb=" << eventBase_ << ", state=" << state_
265           << ", events=" << std::hex << eventFlags_ << ")";
266   // Extract the fd, and set fd_ to -1 first, so closeNow() won't
267   // actually close the descriptor.
268   if (shutdownSocketSet_) {
269     shutdownSocketSet_->remove(fd_);
270   }
271   int fd = fd_;
272   fd_ = -1;
273   // Call closeNow() to invoke all pending callbacks with an error.
274   closeNow();
275   // Update the EventHandler to stop using this fd.
276   // This can only be done after closeNow() unregisters the handler.
277   ioHandler_.changeHandlerFD(-1);
278   return fd;
279 }
280
281 const folly::SocketAddress& AsyncSocket::anyAddress() {
282   static const folly::SocketAddress anyAddress =
283     folly::SocketAddress("0.0.0.0", 0);
284   return anyAddress;
285 }
286
287 void AsyncSocket::setShutdownSocketSet(ShutdownSocketSet* newSS) {
288   if (shutdownSocketSet_ == newSS) {
289     return;
290   }
291   if (shutdownSocketSet_ && fd_ != -1) {
292     shutdownSocketSet_->remove(fd_);
293   }
294   shutdownSocketSet_ = newSS;
295   if (shutdownSocketSet_ && fd_ != -1) {
296     shutdownSocketSet_->add(fd_);
297   }
298 }
299
300 void AsyncSocket::setCloseOnExec() {
301   int rv = fcntl(fd_, F_SETFD, FD_CLOEXEC);
302   if (rv != 0) {
303     throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
304                                withAddr("failed to set close-on-exec flag"),
305                                errno);
306   }
307 }
308
309 void AsyncSocket::connect(ConnectCallback* callback,
310                            const folly::SocketAddress& address,
311                            int timeout,
312                            const OptionMap &options,
313                            const folly::SocketAddress& bindAddr) noexcept {
314   DestructorGuard dg(this);
315   assert(eventBase_->isInEventBaseThread());
316
317   addr_ = address;
318
319   // Make sure we're in the uninitialized state
320   if (state_ != StateEnum::UNINIT) {
321     return invalidState(callback);
322   }
323
324   connectStartTime_ = std::chrono::steady_clock::now();
325   // Make connect end time at least >= connectStartTime.
326   connectEndTime_ = connectStartTime_;
327
328   assert(fd_ == -1);
329   state_ = StateEnum::CONNECTING;
330   connectCallback_ = callback;
331
332   sockaddr_storage addrStorage;
333   sockaddr* saddr = reinterpret_cast<sockaddr*>(&addrStorage);
334
335   try {
336     // Create the socket
337     // Technically the first parameter should actually be a protocol family
338     // constant (PF_xxx) rather than an address family (AF_xxx), but the
339     // distinction is mainly just historical.  In pretty much all
340     // implementations the PF_foo and AF_foo constants are identical.
341     fd_ = socket(address.getFamily(), SOCK_STREAM, 0);
342     if (fd_ < 0) {
343       throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
344                                 withAddr("failed to create socket"), errno);
345     }
346     if (shutdownSocketSet_) {
347       shutdownSocketSet_->add(fd_);
348     }
349     ioHandler_.changeHandlerFD(fd_);
350
351     setCloseOnExec();
352
353     // Put the socket in non-blocking mode
354     int flags = fcntl(fd_, F_GETFL, 0);
355     if (flags == -1) {
356       throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
357                                 withAddr("failed to get socket flags"), errno);
358     }
359     int rv = fcntl(fd_, F_SETFL, flags | O_NONBLOCK);
360     if (rv == -1) {
361       throw AsyncSocketException(
362           AsyncSocketException::INTERNAL_ERROR,
363           withAddr("failed to put socket in non-blocking mode"),
364           errno);
365     }
366
367 #if !defined(MSG_NOSIGNAL) && defined(F_SETNOSIGPIPE)
368     // iOS and OS X don't support MSG_NOSIGNAL; set F_SETNOSIGPIPE instead
369     rv = fcntl(fd_, F_SETNOSIGPIPE, 1);
370     if (rv == -1) {
371       throw AsyncSocketException(
372           AsyncSocketException::INTERNAL_ERROR,
373           "failed to enable F_SETNOSIGPIPE on socket",
374           errno);
375     }
376 #endif
377
378     // By default, turn on TCP_NODELAY
379     // If setNoDelay() fails, we continue anyway; this isn't a fatal error.
380     // setNoDelay() will log an error message if it fails.
381     if (address.getFamily() != AF_UNIX) {
382       (void)setNoDelay(true);
383     }
384
385     VLOG(5) << "AsyncSocket::connect(this=" << this << ", evb=" << eventBase_
386             << ", fd=" << fd_ << ", host=" << address.describe().c_str();
387
388     // bind the socket
389     if (bindAddr != anyAddress()) {
390       int one = 1;
391       if (::setsockopt(fd_, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one))) {
392         doClose();
393         throw AsyncSocketException(
394           AsyncSocketException::NOT_OPEN,
395           "failed to setsockopt prior to bind on " + bindAddr.describe(),
396           errno);
397       }
398
399       bindAddr.getAddress(&addrStorage);
400
401       if (::bind(fd_, saddr, bindAddr.getActualSize()) != 0) {
402         doClose();
403         throw AsyncSocketException(AsyncSocketException::NOT_OPEN,
404                                   "failed to bind to async socket: " +
405                                   bindAddr.describe(),
406                                   errno);
407       }
408     }
409
410     // Apply the additional options if any.
411     for (const auto& opt: options) {
412       int rv = opt.first.apply(fd_, opt.second);
413       if (rv != 0) {
414         throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
415                                   withAddr("failed to set socket option"),
416                                   errno);
417       }
418     }
419
420     // Perform the connect()
421     address.getAddress(&addrStorage);
422
423     rv = ::connect(fd_, saddr, address.getActualSize());
424     if (rv < 0) {
425       if (errno == EINPROGRESS) {
426         // Connection in progress.
427         if (timeout > 0) {
428           // Start a timer in case the connection takes too long.
429           if (!writeTimeout_.scheduleTimeout(timeout)) {
430             throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
431                 withAddr("failed to schedule AsyncSocket connect timeout"));
432           }
433         }
434
435         // Register for write events, so we'll
436         // be notified when the connection finishes/fails.
437         // Note that we don't register for a persistent event here.
438         assert(eventFlags_ == EventHandler::NONE);
439         eventFlags_ = EventHandler::WRITE;
440         if (!ioHandler_.registerHandler(eventFlags_)) {
441           throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
442               withAddr("failed to register AsyncSocket connect handler"));
443         }
444         return;
445       } else {
446         throw AsyncSocketException(AsyncSocketException::NOT_OPEN,
447                                   "connect failed (immediately)", errno);
448       }
449     }
450
451     // If we're still here the connect() succeeded immediately.
452     // Fall through to call the callback outside of this try...catch block
453   } catch (const AsyncSocketException& ex) {
454     return failConnect(__func__, ex);
455   } catch (const std::exception& ex) {
456     // shouldn't happen, but handle it just in case
457     VLOG(4) << "AsyncSocket::connect(this=" << this << ", fd=" << fd_
458                << "): unexpected " << typeid(ex).name() << " exception: "
459                << ex.what();
460     AsyncSocketException tex(AsyncSocketException::INTERNAL_ERROR,
461                             withAddr(string("unexpected exception: ") +
462                                      ex.what()));
463     return failConnect(__func__, tex);
464   }
465
466   // The connection succeeded immediately
467   // The read callback may not have been set yet, and no writes may be pending
468   // yet, so we don't have to register for any events at the moment.
469   VLOG(8) << "AsyncSocket::connect succeeded immediately; this=" << this;
470   assert(readCallback_ == nullptr);
471   assert(writeReqHead_ == nullptr);
472   state_ = StateEnum::ESTABLISHED;
473   invokeConnectSuccess();
474 }
475
476 void AsyncSocket::connect(ConnectCallback* callback,
477                            const string& ip, uint16_t port,
478                            int timeout,
479                            const OptionMap &options) noexcept {
480   DestructorGuard dg(this);
481   try {
482     connectCallback_ = callback;
483     connect(callback, folly::SocketAddress(ip, port), timeout, options);
484   } catch (const std::exception& ex) {
485     AsyncSocketException tex(AsyncSocketException::INTERNAL_ERROR,
486                             ex.what());
487     return failConnect(__func__, tex);
488   }
489 }
490
491 void AsyncSocket::cancelConnect() {
492   connectCallback_ = nullptr;
493   if (state_ == StateEnum::CONNECTING) {
494     closeNow();
495   }
496 }
497
498 void AsyncSocket::setSendTimeout(uint32_t milliseconds) {
499   sendTimeout_ = milliseconds;
500   assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
501
502   // If we are currently pending on write requests, immediately update
503   // writeTimeout_ with the new value.
504   if ((eventFlags_ & EventHandler::WRITE) &&
505       (state_ != StateEnum::CONNECTING)) {
506     assert(state_ == StateEnum::ESTABLISHED);
507     assert((shutdownFlags_ & SHUT_WRITE) == 0);
508     if (sendTimeout_ > 0) {
509       if (!writeTimeout_.scheduleTimeout(sendTimeout_)) {
510         AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
511             withAddr("failed to reschedule send timeout in setSendTimeout"));
512         return failWrite(__func__, ex);
513       }
514     } else {
515       writeTimeout_.cancelTimeout();
516     }
517   }
518 }
519
520 void AsyncSocket::setReadCB(ReadCallback *callback) {
521   VLOG(6) << "AsyncSocket::setReadCallback() this=" << this << ", fd=" << fd_
522           << ", callback=" << callback << ", state=" << state_;
523
524   // Short circuit if callback is the same as the existing readCallback_.
525   //
526   // Note that this is needed for proper functioning during some cleanup cases.
527   // During cleanup we allow setReadCallback(nullptr) to be called even if the
528   // read callback is already unset and we have been detached from an event
529   // base.  This check prevents us from asserting
530   // eventBase_->isInEventBaseThread() when eventBase_ is nullptr.
531   if (callback == readCallback_) {
532     return;
533   }
534
535   /* We are removing a read callback */
536   if (callback == nullptr &&
537       immediateReadHandler_.isLoopCallbackScheduled()) {
538     immediateReadHandler_.cancelLoopCallback();
539   }
540
541   if (shutdownFlags_ & SHUT_READ) {
542     // Reads have already been shut down on this socket.
543     //
544     // Allow setReadCallback(nullptr) to be called in this case, but don't
545     // allow a new callback to be set.
546     //
547     // For example, setReadCallback(nullptr) can happen after an error if we
548     // invoke some other error callback before invoking readError().  The other
549     // error callback that is invoked first may go ahead and clear the read
550     // callback before we get a chance to invoke readError().
551     if (callback != nullptr) {
552       return invalidState(callback);
553     }
554     assert((eventFlags_ & EventHandler::READ) == 0);
555     readCallback_ = nullptr;
556     return;
557   }
558
559   DestructorGuard dg(this);
560   assert(eventBase_->isInEventBaseThread());
561
562   switch ((StateEnum)state_) {
563     case StateEnum::CONNECTING:
564       // For convenience, we allow the read callback to be set while we are
565       // still connecting.  We just store the callback for now.  Once the
566       // connection completes we'll register for read events.
567       readCallback_ = callback;
568       return;
569     case StateEnum::ESTABLISHED:
570     {
571       readCallback_ = callback;
572       uint16_t oldFlags = eventFlags_;
573       if (readCallback_) {
574         eventFlags_ |= EventHandler::READ;
575       } else {
576         eventFlags_ &= ~EventHandler::READ;
577       }
578
579       // Update our registration if our flags have changed
580       if (eventFlags_ != oldFlags) {
581         // We intentionally ignore the return value here.
582         // updateEventRegistration() will move us into the error state if it
583         // fails, and we don't need to do anything else here afterwards.
584         (void)updateEventRegistration();
585       }
586
587       if (readCallback_) {
588         checkForImmediateRead();
589       }
590       return;
591     }
592     case StateEnum::CLOSED:
593     case StateEnum::ERROR:
594       // We should never reach here.  SHUT_READ should always be set
595       // if we are in STATE_CLOSED or STATE_ERROR.
596       assert(false);
597       return invalidState(callback);
598     case StateEnum::UNINIT:
599       // We do not allow setReadCallback() to be called before we start
600       // connecting.
601       return invalidState(callback);
602   }
603
604   // We don't put a default case in the switch statement, so that the compiler
605   // will warn us to update the switch statement if a new state is added.
606   return invalidState(callback);
607 }
608
609 AsyncSocket::ReadCallback* AsyncSocket::getReadCallback() const {
610   return readCallback_;
611 }
612
613 void AsyncSocket::write(WriteCallback* callback,
614                          const void* buf, size_t bytes, WriteFlags flags,
615                          BufferCallback* bufCallback) {
616   iovec op;
617   op.iov_base = const_cast<void*>(buf);
618   op.iov_len = bytes;
619   writeImpl(callback, &op, 1, unique_ptr<IOBuf>(), flags, bufCallback);
620 }
621
622 void AsyncSocket::writev(WriteCallback* callback,
623                           const iovec* vec,
624                           size_t count,
625                           WriteFlags flags,
626                           BufferCallback* bufCallback) {
627   writeImpl(callback, vec, count, unique_ptr<IOBuf>(), flags, bufCallback);
628 }
629
630 void AsyncSocket::writeChain(WriteCallback* callback, unique_ptr<IOBuf>&& buf,
631                               WriteFlags flags, BufferCallback* bufCallback) {
632   constexpr size_t kSmallSizeMax = 64;
633   size_t count = buf->countChainElements();
634   if (count <= kSmallSizeMax) {
635     iovec vec[BOOST_PP_IF(FOLLY_HAVE_VLA, count, kSmallSizeMax)];
636     writeChainImpl(callback, vec, count, std::move(buf), flags, bufCallback);
637   } else {
638     iovec* vec = new iovec[count];
639     writeChainImpl(callback, vec, count, std::move(buf), flags, bufCallback);
640     delete[] vec;
641   }
642 }
643
644 void AsyncSocket::writeChainImpl(WriteCallback* callback, iovec* vec,
645     size_t count, unique_ptr<IOBuf>&& buf, WriteFlags flags,
646     BufferCallback* bufCallback) {
647   size_t veclen = buf->fillIov(vec, count);
648   writeImpl(callback, vec, veclen, std::move(buf), flags, bufCallback);
649 }
650
651 void AsyncSocket::writeImpl(WriteCallback* callback, const iovec* vec,
652                              size_t count, unique_ptr<IOBuf>&& buf,
653                              WriteFlags flags, BufferCallback* bufCallback) {
654   VLOG(6) << "AsyncSocket::writev() this=" << this << ", fd=" << fd_
655           << ", callback=" << callback << ", count=" << count
656           << ", state=" << state_;
657   DestructorGuard dg(this);
658   unique_ptr<IOBuf>ioBuf(std::move(buf));
659   assert(eventBase_->isInEventBaseThread());
660
661   if (shutdownFlags_ & (SHUT_WRITE | SHUT_WRITE_PENDING)) {
662     // No new writes may be performed after the write side of the socket has
663     // been shutdown.
664     //
665     // We could just call callback->writeError() here to fail just this write.
666     // However, fail hard and use invalidState() to fail all outstanding
667     // callbacks and move the socket into the error state.  There's most likely
668     // a bug in the caller's code, so we abort everything rather than trying to
669     // proceed as best we can.
670     return invalidState(callback);
671   }
672
673   uint32_t countWritten = 0;
674   uint32_t partialWritten = 0;
675   int bytesWritten = 0;
676   bool mustRegister = false;
677   if (state_ == StateEnum::ESTABLISHED && !connecting()) {
678     if (writeReqHead_ == nullptr) {
679       // If we are established and there are no other writes pending,
680       // we can attempt to perform the write immediately.
681       assert(writeReqTail_ == nullptr);
682       assert((eventFlags_ & EventHandler::WRITE) == 0);
683
684       bytesWritten = performWrite(vec, count, flags,
685                                   &countWritten, &partialWritten);
686       if (bytesWritten < 0) {
687         AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
688                                withAddr("writev failed"), errno);
689         return failWrite(__func__, callback, 0, ex);
690       } else if (countWritten == count) {
691         // We successfully wrote everything.
692         // Invoke the callback and return.
693         if (callback) {
694           callback->writeSuccess();
695         }
696         return;
697       } else { // continue writing the next writeReq
698         if (bufCallback) {
699           bufCallback->onEgressBuffered();
700         }
701       }
702       mustRegister = true;
703     }
704   } else if (!connecting()) {
705     // Invalid state for writing
706     return invalidState(callback);
707   }
708
709   // Create a new WriteRequest to add to the queue
710   WriteRequest* req;
711   try {
712     req = BytesWriteRequest::newRequest(this, callback, vec + countWritten,
713                                         count - countWritten, partialWritten,
714                                         bytesWritten, std::move(ioBuf), flags,
715                                         bufCallback);
716   } catch (const std::exception& ex) {
717     // we mainly expect to catch std::bad_alloc here
718     AsyncSocketException tex(AsyncSocketException::INTERNAL_ERROR,
719         withAddr(string("failed to append new WriteRequest: ") + ex.what()));
720     return failWrite(__func__, callback, bytesWritten, tex);
721   }
722   req->consume();
723   if (writeReqTail_ == nullptr) {
724     assert(writeReqHead_ == nullptr);
725     writeReqHead_ = writeReqTail_ = req;
726   } else {
727     writeReqTail_->append(req);
728     writeReqTail_ = req;
729   }
730
731   // Register for write events if are established and not currently
732   // waiting on write events
733   if (mustRegister) {
734     assert(state_ == StateEnum::ESTABLISHED);
735     assert((eventFlags_ & EventHandler::WRITE) == 0);
736     if (!updateEventRegistration(EventHandler::WRITE, 0)) {
737       assert(state_ == StateEnum::ERROR);
738       return;
739     }
740     if (sendTimeout_ > 0) {
741       // Schedule a timeout to fire if the write takes too long.
742       if (!writeTimeout_.scheduleTimeout(sendTimeout_)) {
743         AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
744                                withAddr("failed to schedule send timeout"));
745         return failWrite(__func__, ex);
746       }
747     }
748   }
749 }
750
751 void AsyncSocket::writeRequest(WriteRequest* req) {
752   if (writeReqTail_ == nullptr) {
753     assert(writeReqHead_ == nullptr);
754     writeReqHead_ = writeReqTail_ = req;
755     req->start();
756   } else {
757     writeReqTail_->append(req);
758     writeReqTail_ = req;
759   }
760 }
761
762 void AsyncSocket::close() {
763   VLOG(5) << "AsyncSocket::close(): this=" << this << ", fd_=" << fd_
764           << ", state=" << state_ << ", shutdownFlags="
765           << std::hex << (int) shutdownFlags_;
766
767   // close() is only different from closeNow() when there are pending writes
768   // that need to drain before we can close.  In all other cases, just call
769   // closeNow().
770   //
771   // Note that writeReqHead_ can be non-nullptr even in STATE_CLOSED or
772   // STATE_ERROR if close() is invoked while a previous closeNow() or failure
773   // is still running.  (e.g., If there are multiple pending writes, and we
774   // call writeError() on the first one, it may call close().  In this case we
775   // will already be in STATE_CLOSED or STATE_ERROR, but the remaining pending
776   // writes will still be in the queue.)
777   //
778   // We only need to drain pending writes if we are still in STATE_CONNECTING
779   // or STATE_ESTABLISHED
780   if ((writeReqHead_ == nullptr) ||
781       !(state_ == StateEnum::CONNECTING ||
782       state_ == StateEnum::ESTABLISHED)) {
783     closeNow();
784     return;
785   }
786
787   // Declare a DestructorGuard to ensure that the AsyncSocket cannot be
788   // destroyed until close() returns.
789   DestructorGuard dg(this);
790   assert(eventBase_->isInEventBaseThread());
791
792   // Since there are write requests pending, we have to set the
793   // SHUT_WRITE_PENDING flag, and wait to perform the real close until the
794   // connect finishes and we finish writing these requests.
795   //
796   // Set SHUT_READ to indicate that reads are shut down, and set the
797   // SHUT_WRITE_PENDING flag to mark that we want to shutdown once the
798   // pending writes complete.
799   shutdownFlags_ |= (SHUT_READ | SHUT_WRITE_PENDING);
800
801   // If a read callback is set, invoke readEOF() immediately to inform it that
802   // the socket has been closed and no more data can be read.
803   if (readCallback_) {
804     // Disable reads if they are enabled
805     if (!updateEventRegistration(0, EventHandler::READ)) {
806       // We're now in the error state; callbacks have been cleaned up
807       assert(state_ == StateEnum::ERROR);
808       assert(readCallback_ == nullptr);
809     } else {
810       ReadCallback* callback = readCallback_;
811       readCallback_ = nullptr;
812       callback->readEOF();
813     }
814   }
815 }
816
817 void AsyncSocket::closeNow() {
818   VLOG(5) << "AsyncSocket::closeNow(): this=" << this << ", fd_=" << fd_
819           << ", state=" << state_ << ", shutdownFlags="
820           << std::hex << (int) shutdownFlags_;
821   DestructorGuard dg(this);
822   assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
823
824   switch (state_) {
825     case StateEnum::ESTABLISHED:
826     case StateEnum::CONNECTING:
827     {
828       shutdownFlags_ |= (SHUT_READ | SHUT_WRITE);
829       state_ = StateEnum::CLOSED;
830
831       // If the write timeout was set, cancel it.
832       writeTimeout_.cancelTimeout();
833
834       // If we are registered for I/O events, unregister.
835       if (eventFlags_ != EventHandler::NONE) {
836         eventFlags_ = EventHandler::NONE;
837         if (!updateEventRegistration()) {
838           // We will have been moved into the error state.
839           assert(state_ == StateEnum::ERROR);
840           return;
841         }
842       }
843
844       if (immediateReadHandler_.isLoopCallbackScheduled()) {
845         immediateReadHandler_.cancelLoopCallback();
846       }
847
848       if (fd_ >= 0) {
849         ioHandler_.changeHandlerFD(-1);
850         doClose();
851       }
852
853       invokeConnectErr(socketClosedLocallyEx);
854
855       failAllWrites(socketClosedLocallyEx);
856
857       if (readCallback_) {
858         ReadCallback* callback = readCallback_;
859         readCallback_ = nullptr;
860         callback->readEOF();
861       }
862       return;
863     }
864     case StateEnum::CLOSED:
865       // Do nothing.  It's possible that we are being called recursively
866       // from inside a callback that we invoked inside another call to close()
867       // that is still running.
868       return;
869     case StateEnum::ERROR:
870       // Do nothing.  The error handling code has performed (or is performing)
871       // cleanup.
872       return;
873     case StateEnum::UNINIT:
874       assert(eventFlags_ == EventHandler::NONE);
875       assert(connectCallback_ == nullptr);
876       assert(readCallback_ == nullptr);
877       assert(writeReqHead_ == nullptr);
878       shutdownFlags_ |= (SHUT_READ | SHUT_WRITE);
879       state_ = StateEnum::CLOSED;
880       return;
881   }
882
883   LOG(DFATAL) << "AsyncSocket::closeNow() (this=" << this << ", fd=" << fd_
884               << ") called in unknown state " << state_;
885 }
886
887 void AsyncSocket::closeWithReset() {
888   // Enable SO_LINGER, with the linger timeout set to 0.
889   // This will trigger a TCP reset when we close the socket.
890   if (fd_ >= 0) {
891     struct linger optLinger = {1, 0};
892     if (setSockOpt(SOL_SOCKET, SO_LINGER, &optLinger) != 0) {
893       VLOG(2) << "AsyncSocket::closeWithReset(): error setting SO_LINGER "
894               << "on " << fd_ << ": errno=" << errno;
895     }
896   }
897
898   // Then let closeNow() take care of the rest
899   closeNow();
900 }
901
902 void AsyncSocket::shutdownWrite() {
903   VLOG(5) << "AsyncSocket::shutdownWrite(): this=" << this << ", fd=" << fd_
904           << ", state=" << state_ << ", shutdownFlags="
905           << std::hex << (int) shutdownFlags_;
906
907   // If there are no pending writes, shutdownWrite() is identical to
908   // shutdownWriteNow().
909   if (writeReqHead_ == nullptr) {
910     shutdownWriteNow();
911     return;
912   }
913
914   assert(eventBase_->isInEventBaseThread());
915
916   // There are pending writes.  Set SHUT_WRITE_PENDING so that the actual
917   // shutdown will be performed once all writes complete.
918   shutdownFlags_ |= SHUT_WRITE_PENDING;
919 }
920
921 void AsyncSocket::shutdownWriteNow() {
922   VLOG(5) << "AsyncSocket::shutdownWriteNow(): this=" << this
923           << ", fd=" << fd_ << ", state=" << state_
924           << ", shutdownFlags=" << std::hex << (int) shutdownFlags_;
925
926   if (shutdownFlags_ & SHUT_WRITE) {
927     // Writes are already shutdown; nothing else to do.
928     return;
929   }
930
931   // If SHUT_READ is already set, just call closeNow() to completely
932   // close the socket.  This can happen if close() was called with writes
933   // pending, and then shutdownWriteNow() is called before all pending writes
934   // complete.
935   if (shutdownFlags_ & SHUT_READ) {
936     closeNow();
937     return;
938   }
939
940   DestructorGuard dg(this);
941   assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
942
943   switch (static_cast<StateEnum>(state_)) {
944     case StateEnum::ESTABLISHED:
945     {
946       shutdownFlags_ |= SHUT_WRITE;
947
948       // If the write timeout was set, cancel it.
949       writeTimeout_.cancelTimeout();
950
951       // If we are registered for write events, unregister.
952       if (!updateEventRegistration(0, EventHandler::WRITE)) {
953         // We will have been moved into the error state.
954         assert(state_ == StateEnum::ERROR);
955         return;
956       }
957
958       // Shutdown writes on the file descriptor
959       ::shutdown(fd_, SHUT_WR);
960
961       // Immediately fail all write requests
962       failAllWrites(socketShutdownForWritesEx);
963       return;
964     }
965     case StateEnum::CONNECTING:
966     {
967       // Set the SHUT_WRITE_PENDING flag.
968       // When the connection completes, it will check this flag,
969       // shutdown the write half of the socket, and then set SHUT_WRITE.
970       shutdownFlags_ |= SHUT_WRITE_PENDING;
971
972       // Immediately fail all write requests
973       failAllWrites(socketShutdownForWritesEx);
974       return;
975     }
976     case StateEnum::UNINIT:
977       // Callers normally shouldn't call shutdownWriteNow() before the socket
978       // even starts connecting.  Nonetheless, go ahead and set
979       // SHUT_WRITE_PENDING.  Once the socket eventually connects it will
980       // immediately shut down the write side of the socket.
981       shutdownFlags_ |= SHUT_WRITE_PENDING;
982       return;
983     case StateEnum::CLOSED:
984     case StateEnum::ERROR:
985       // We should never get here.  SHUT_WRITE should always be set
986       // in STATE_CLOSED and STATE_ERROR.
987       VLOG(4) << "AsyncSocket::shutdownWriteNow() (this=" << this
988                  << ", fd=" << fd_ << ") in unexpected state " << state_
989                  << " with SHUT_WRITE not set ("
990                  << std::hex << (int) shutdownFlags_ << ")";
991       assert(false);
992       return;
993   }
994
995   LOG(DFATAL) << "AsyncSocket::shutdownWriteNow() (this=" << this << ", fd="
996               << fd_ << ") called in unknown state " << state_;
997 }
998
999 bool AsyncSocket::readable() const {
1000   if (fd_ == -1) {
1001     return false;
1002   }
1003   struct pollfd fds[1];
1004   fds[0].fd = fd_;
1005   fds[0].events = POLLIN;
1006   fds[0].revents = 0;
1007   int rc = poll(fds, 1, 0);
1008   return rc == 1;
1009 }
1010
1011 bool AsyncSocket::isPending() const {
1012   return ioHandler_.isPending();
1013 }
1014
1015 bool AsyncSocket::hangup() const {
1016   if (fd_ == -1) {
1017     // sanity check, no one should ask for hangup if we are not connected.
1018     assert(false);
1019     return false;
1020   }
1021 #ifdef POLLRDHUP // Linux-only
1022   struct pollfd fds[1];
1023   fds[0].fd = fd_;
1024   fds[0].events = POLLRDHUP|POLLHUP;
1025   fds[0].revents = 0;
1026   poll(fds, 1, 0);
1027   return (fds[0].revents & (POLLRDHUP|POLLHUP)) != 0;
1028 #else
1029   return false;
1030 #endif
1031 }
1032
1033 bool AsyncSocket::good() const {
1034   return ((state_ == StateEnum::CONNECTING ||
1035           state_ == StateEnum::ESTABLISHED) &&
1036           (shutdownFlags_ == 0) && (eventBase_ != nullptr));
1037 }
1038
1039 bool AsyncSocket::error() const {
1040   return (state_ == StateEnum::ERROR);
1041 }
1042
1043 void AsyncSocket::attachEventBase(EventBase* eventBase) {
1044   VLOG(5) << "AsyncSocket::attachEventBase(this=" << this << ", fd=" << fd_
1045           << ", old evb=" << eventBase_ << ", new evb=" << eventBase
1046           << ", state=" << state_ << ", events="
1047           << std::hex << eventFlags_ << ")";
1048   assert(eventBase_ == nullptr);
1049   assert(eventBase->isInEventBaseThread());
1050
1051   eventBase_ = eventBase;
1052   ioHandler_.attachEventBase(eventBase);
1053   writeTimeout_.attachEventBase(eventBase);
1054 }
1055
1056 void AsyncSocket::detachEventBase() {
1057   VLOG(5) << "AsyncSocket::detachEventBase(this=" << this << ", fd=" << fd_
1058           << ", old evb=" << eventBase_ << ", state=" << state_
1059           << ", events=" << std::hex << eventFlags_ << ")";
1060   assert(eventBase_ != nullptr);
1061   assert(eventBase_->isInEventBaseThread());
1062
1063   eventBase_ = nullptr;
1064   ioHandler_.detachEventBase();
1065   writeTimeout_.detachEventBase();
1066 }
1067
1068 bool AsyncSocket::isDetachable() const {
1069   DCHECK(eventBase_ != nullptr);
1070   DCHECK(eventBase_->isInEventBaseThread());
1071
1072   return !ioHandler_.isHandlerRegistered() && !writeTimeout_.isScheduled();
1073 }
1074
1075 void AsyncSocket::getLocalAddress(folly::SocketAddress* address) const {
1076   address->setFromLocalAddress(fd_);
1077 }
1078
1079 void AsyncSocket::getPeerAddress(folly::SocketAddress* address) const {
1080   if (!addr_.isInitialized()) {
1081     addr_.setFromPeerAddress(fd_);
1082   }
1083   *address = addr_;
1084 }
1085
1086 int AsyncSocket::setNoDelay(bool noDelay) {
1087   if (fd_ < 0) {
1088     VLOG(4) << "AsyncSocket::setNoDelay() called on non-open socket "
1089                << this << "(state=" << state_ << ")";
1090     return EINVAL;
1091
1092   }
1093
1094   int value = noDelay ? 1 : 0;
1095   if (setsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &value, sizeof(value)) != 0) {
1096     int errnoCopy = errno;
1097     VLOG(2) << "failed to update TCP_NODELAY option on AsyncSocket "
1098             << this << " (fd=" << fd_ << ", state=" << state_ << "): "
1099             << strerror(errnoCopy);
1100     return errnoCopy;
1101   }
1102
1103   return 0;
1104 }
1105
1106 int AsyncSocket::setCongestionFlavor(const std::string &cname) {
1107
1108   #ifndef TCP_CONGESTION
1109   #define TCP_CONGESTION  13
1110   #endif
1111
1112   if (fd_ < 0) {
1113     VLOG(4) << "AsyncSocket::setCongestionFlavor() called on non-open "
1114                << "socket " << this << "(state=" << state_ << ")";
1115     return EINVAL;
1116
1117   }
1118
1119   if (setsockopt(fd_, IPPROTO_TCP, TCP_CONGESTION, cname.c_str(),
1120         cname.length() + 1) != 0) {
1121     int errnoCopy = errno;
1122     VLOG(2) << "failed to update TCP_CONGESTION option on AsyncSocket "
1123             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1124             << strerror(errnoCopy);
1125     return errnoCopy;
1126   }
1127
1128   return 0;
1129 }
1130
1131 int AsyncSocket::setQuickAck(bool quickack) {
1132   if (fd_ < 0) {
1133     VLOG(4) << "AsyncSocket::setQuickAck() called on non-open socket "
1134                << this << "(state=" << state_ << ")";
1135     return EINVAL;
1136
1137   }
1138
1139 #ifdef TCP_QUICKACK // Linux-only
1140   int value = quickack ? 1 : 0;
1141   if (setsockopt(fd_, IPPROTO_TCP, TCP_QUICKACK, &value, sizeof(value)) != 0) {
1142     int errnoCopy = errno;
1143     VLOG(2) << "failed to update TCP_QUICKACK option on AsyncSocket"
1144             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1145             << strerror(errnoCopy);
1146     return errnoCopy;
1147   }
1148
1149   return 0;
1150 #else
1151   return ENOSYS;
1152 #endif
1153 }
1154
1155 int AsyncSocket::setSendBufSize(size_t bufsize) {
1156   if (fd_ < 0) {
1157     VLOG(4) << "AsyncSocket::setSendBufSize() called on non-open socket "
1158                << this << "(state=" << state_ << ")";
1159     return EINVAL;
1160   }
1161
1162   if (setsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &bufsize, sizeof(bufsize)) !=0) {
1163     int errnoCopy = errno;
1164     VLOG(2) << "failed to update SO_SNDBUF option on AsyncSocket"
1165             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1166             << strerror(errnoCopy);
1167     return errnoCopy;
1168   }
1169
1170   return 0;
1171 }
1172
1173 int AsyncSocket::setRecvBufSize(size_t bufsize) {
1174   if (fd_ < 0) {
1175     VLOG(4) << "AsyncSocket::setRecvBufSize() called on non-open socket "
1176                << this << "(state=" << state_ << ")";
1177     return EINVAL;
1178   }
1179
1180   if (setsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &bufsize, sizeof(bufsize)) !=0) {
1181     int errnoCopy = errno;
1182     VLOG(2) << "failed to update SO_RCVBUF option on AsyncSocket"
1183             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1184             << strerror(errnoCopy);
1185     return errnoCopy;
1186   }
1187
1188   return 0;
1189 }
1190
1191 int AsyncSocket::setTCPProfile(int profd) {
1192   if (fd_ < 0) {
1193     VLOG(4) << "AsyncSocket::setTCPProfile() called on non-open socket "
1194                << this << "(state=" << state_ << ")";
1195     return EINVAL;
1196   }
1197
1198   if (setsockopt(fd_, SOL_SOCKET, SO_SET_NAMESPACE, &profd, sizeof(int)) !=0) {
1199     int errnoCopy = errno;
1200     VLOG(2) << "failed to set socket namespace option on AsyncSocket"
1201             << this << "(fd=" << fd_ << ", state=" << state_ << "): "
1202             << strerror(errnoCopy);
1203     return errnoCopy;
1204   }
1205
1206   return 0;
1207 }
1208
1209 void AsyncSocket::setPersistentCork(bool cork) {
1210   if (setCork(cork) == 0) {
1211     persistentCork_ = cork;
1212   }
1213 }
1214
1215 int AsyncSocket::setCork(bool cork) {
1216 #ifdef TCP_CORK
1217   if (fd_ < 0) {
1218     VLOG(4) << "AsyncSocket::setCork() called on non-open socket "
1219             << this << "(stats=" << state_ << ")";
1220     return EINVAL;
1221   }
1222
1223   if (corked_ == cork) {
1224     return 0;
1225   }
1226
1227   int flag = cork ? 1 : 0;
1228   if (setsockopt(fd_, IPPROTO_TCP, TCP_CORK, &flag, sizeof(flag)) != 0) {
1229     int errnoCopy = errno;
1230     VLOG(2) << "faield to turn on TCP_CORK option on AsyncSocket"
1231             << this << "(fd=" << fd_ << ", state=" << state_ << "):"
1232             << folly::errnoStr(errnoCopy);
1233     return errnoCopy;
1234   }
1235   corked_ = cork;
1236 #endif
1237   return 0;
1238 }
1239
1240 void AsyncSocket::ioReady(uint16_t events) noexcept {
1241   VLOG(7) << "AsyncSocket::ioRead() this=" << this << ", fd" << fd_
1242           << ", events=" << std::hex << events << ", state=" << state_;
1243   DestructorGuard dg(this);
1244   assert(events & EventHandler::READ_WRITE);
1245   assert(eventBase_->isInEventBaseThread());
1246
1247   uint16_t relevantEvents = events & EventHandler::READ_WRITE;
1248   if (relevantEvents == EventHandler::READ) {
1249     handleRead();
1250   } else if (relevantEvents == EventHandler::WRITE) {
1251     handleWrite();
1252   } else if (relevantEvents == EventHandler::READ_WRITE) {
1253     EventBase* originalEventBase = eventBase_;
1254     // If both read and write events are ready, process writes first.
1255     handleWrite();
1256
1257     // Return now if handleWrite() detached us from our EventBase
1258     if (eventBase_ != originalEventBase) {
1259       return;
1260     }
1261
1262     // Only call handleRead() if a read callback is still installed.
1263     // (It's possible that the read callback was uninstalled during
1264     // handleWrite().)
1265     if (readCallback_) {
1266       handleRead();
1267     }
1268   } else {
1269     VLOG(4) << "AsyncSocket::ioRead() called with unexpected events "
1270                << std::hex << events << "(this=" << this << ")";
1271     abort();
1272   }
1273 }
1274
1275 ssize_t AsyncSocket::performRead(void** buf, size_t* buflen, size_t* offset) {
1276   VLOG(5) << "AsyncSocket::performRead() this=" << this
1277           << ", buf=" << *buf << ", buflen=" << *buflen;
1278
1279   int recvFlags = 0;
1280   if (peek_) {
1281     recvFlags |= MSG_PEEK;
1282   }
1283
1284   ssize_t bytes = recv(fd_, *buf, *buflen, MSG_DONTWAIT | recvFlags);
1285   if (bytes < 0) {
1286     if (errno == EAGAIN || errno == EWOULDBLOCK) {
1287       // No more data to read right now.
1288       return READ_BLOCKING;
1289     } else {
1290       return READ_ERROR;
1291     }
1292   } else {
1293     appBytesReceived_ += bytes;
1294     return bytes;
1295   }
1296 }
1297
1298 void AsyncSocket::prepareReadBuffer(void** buf, size_t* buflen) noexcept {
1299   // no matter what, buffer should be preapared for non-ssl socket
1300   CHECK(readCallback_);
1301   readCallback_->getReadBuffer(buf, buflen);
1302 }
1303
1304 void AsyncSocket::handleRead() noexcept {
1305   VLOG(5) << "AsyncSocket::handleRead() this=" << this << ", fd=" << fd_
1306           << ", state=" << state_;
1307   assert(state_ == StateEnum::ESTABLISHED);
1308   assert((shutdownFlags_ & SHUT_READ) == 0);
1309   assert(readCallback_ != nullptr);
1310   assert(eventFlags_ & EventHandler::READ);
1311
1312   // Loop until:
1313   // - a read attempt would block
1314   // - readCallback_ is uninstalled
1315   // - the number of loop iterations exceeds the optional maximum
1316   // - this AsyncSocket is moved to another EventBase
1317   //
1318   // When we invoke readDataAvailable() it may uninstall the readCallback_,
1319   // which is why need to check for it here.
1320   //
1321   // The last bullet point is slightly subtle.  readDataAvailable() may also
1322   // detach this socket from this EventBase.  However, before
1323   // readDataAvailable() returns another thread may pick it up, attach it to
1324   // a different EventBase, and install another readCallback_.  We need to
1325   // exit immediately after readDataAvailable() returns if the eventBase_ has
1326   // changed.  (The caller must perform some sort of locking to transfer the
1327   // AsyncSocket between threads properly.  This will be sufficient to ensure
1328   // that this thread sees the updated eventBase_ variable after
1329   // readDataAvailable() returns.)
1330   uint16_t numReads = 0;
1331   EventBase* originalEventBase = eventBase_;
1332   while (readCallback_ && eventBase_ == originalEventBase) {
1333     // Get the buffer to read into.
1334     void* buf = nullptr;
1335     size_t buflen = 0, offset = 0;
1336     try {
1337       prepareReadBuffer(&buf, &buflen);
1338       VLOG(5) << "prepareReadBuffer() buf=" << buf << ", buflen=" << buflen;
1339     } catch (const AsyncSocketException& ex) {
1340       return failRead(__func__, ex);
1341     } catch (const std::exception& ex) {
1342       AsyncSocketException tex(AsyncSocketException::BAD_ARGS,
1343                               string("ReadCallback::getReadBuffer() "
1344                                      "threw exception: ") +
1345                               ex.what());
1346       return failRead(__func__, tex);
1347     } catch (...) {
1348       AsyncSocketException ex(AsyncSocketException::BAD_ARGS,
1349                              "ReadCallback::getReadBuffer() threw "
1350                              "non-exception type");
1351       return failRead(__func__, ex);
1352     }
1353     if (!isBufferMovable_ && (buf == nullptr || buflen == 0)) {
1354       AsyncSocketException ex(AsyncSocketException::BAD_ARGS,
1355                              "ReadCallback::getReadBuffer() returned "
1356                              "empty buffer");
1357       return failRead(__func__, ex);
1358     }
1359
1360     // Perform the read
1361     ssize_t bytesRead = performRead(&buf, &buflen, &offset);
1362     VLOG(4) << "this=" << this << ", AsyncSocket::handleRead() got "
1363             << bytesRead << " bytes";
1364     if (bytesRead > 0) {
1365       if (!isBufferMovable_) {
1366         readCallback_->readDataAvailable(bytesRead);
1367       } else {
1368         CHECK(kOpenSslModeMoveBufferOwnership);
1369         VLOG(5) << "this=" << this << ", AsyncSocket::handleRead() got "
1370                 << "buf=" << buf << ", " << bytesRead << "/" << buflen
1371                 << ", offset=" << offset;
1372         auto readBuf = folly::IOBuf::takeOwnership(buf, buflen);
1373         readBuf->trimStart(offset);
1374         readBuf->trimEnd(buflen - offset - bytesRead);
1375         readCallback_->readBufferAvailable(std::move(readBuf));
1376       }
1377
1378       // Fall through and continue around the loop if the read
1379       // completely filled the available buffer.
1380       // Note that readCallback_ may have been uninstalled or changed inside
1381       // readDataAvailable().
1382       if (size_t(bytesRead) < buflen) {
1383         return;
1384       }
1385     } else if (bytesRead == READ_BLOCKING) {
1386         // No more data to read right now.
1387         return;
1388     } else if (bytesRead == READ_ERROR) {
1389       readErr_ = READ_ERROR;
1390       AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1391                              withAddr("recv() failed"), errno);
1392       return failRead(__func__, ex);
1393     } else {
1394       assert(bytesRead == READ_EOF);
1395       readErr_ = READ_EOF;
1396       // EOF
1397       shutdownFlags_ |= SHUT_READ;
1398       if (!updateEventRegistration(0, EventHandler::READ)) {
1399         // we've already been moved into STATE_ERROR
1400         assert(state_ == StateEnum::ERROR);
1401         assert(readCallback_ == nullptr);
1402         return;
1403       }
1404
1405       ReadCallback* callback = readCallback_;
1406       readCallback_ = nullptr;
1407       callback->readEOF();
1408       return;
1409     }
1410     if (maxReadsPerEvent_ && (++numReads >= maxReadsPerEvent_)) {
1411       if (readCallback_ != nullptr) {
1412         // We might still have data in the socket.
1413         // (e.g. see comment in AsyncSSLSocket::checkForImmediateRead)
1414         scheduleImmediateRead();
1415       }
1416       return;
1417     }
1418   }
1419 }
1420
1421 /**
1422  * This function attempts to write as much data as possible, until no more data
1423  * can be written.
1424  *
1425  * - If it sends all available data, it unregisters for write events, and stops
1426  *   the writeTimeout_.
1427  *
1428  * - If not all of the data can be sent immediately, it reschedules
1429  *   writeTimeout_ (if a non-zero timeout is set), and ensures the handler is
1430  *   registered for write events.
1431  */
1432 void AsyncSocket::handleWrite() noexcept {
1433   VLOG(5) << "AsyncSocket::handleWrite() this=" << this << ", fd=" << fd_
1434           << ", state=" << state_;
1435   if (state_ == StateEnum::CONNECTING) {
1436     handleConnect();
1437     return;
1438   }
1439
1440   // Normal write
1441   assert(state_ == StateEnum::ESTABLISHED);
1442   assert((shutdownFlags_ & SHUT_WRITE) == 0);
1443   assert(writeReqHead_ != nullptr);
1444
1445   // Loop until we run out of write requests,
1446   // or until this socket is moved to another EventBase.
1447   // (See the comment in handleRead() explaining how this can happen.)
1448   EventBase* originalEventBase = eventBase_;
1449   while (writeReqHead_ != nullptr && eventBase_ == originalEventBase) {
1450     if (!writeReqHead_->performWrite()) {
1451       AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1452                              withAddr("writev() failed"), errno);
1453       return failWrite(__func__, ex);
1454     } else if (writeReqHead_->isComplete()) {
1455       // We finished this request
1456       WriteRequest* req = writeReqHead_;
1457       writeReqHead_ = req->getNext();
1458
1459       if (writeReqHead_ == nullptr) {
1460         writeReqTail_ = nullptr;
1461         // This is the last write request.
1462         // Unregister for write events and cancel the send timer
1463         // before we invoke the callback.  We have to update the state properly
1464         // before calling the callback, since it may want to detach us from
1465         // the EventBase.
1466         if (eventFlags_ & EventHandler::WRITE) {
1467           if (!updateEventRegistration(0, EventHandler::WRITE)) {
1468             assert(state_ == StateEnum::ERROR);
1469             return;
1470           }
1471           // Stop the send timeout
1472           writeTimeout_.cancelTimeout();
1473         }
1474         assert(!writeTimeout_.isScheduled());
1475
1476         // If SHUT_WRITE_PENDING is set, we should shutdown the socket after
1477         // we finish sending the last write request.
1478         //
1479         // We have to do this before invoking writeSuccess(), since
1480         // writeSuccess() may detach us from our EventBase.
1481         if (shutdownFlags_ & SHUT_WRITE_PENDING) {
1482           assert(connectCallback_ == nullptr);
1483           shutdownFlags_ |= SHUT_WRITE;
1484
1485           if (shutdownFlags_ & SHUT_READ) {
1486             // Reads have already been shutdown.  Fully close the socket and
1487             // move to STATE_CLOSED.
1488             //
1489             // Note: This code currently moves us to STATE_CLOSED even if
1490             // close() hasn't ever been called.  This can occur if we have
1491             // received EOF from the peer and shutdownWrite() has been called
1492             // locally.  Should we bother staying in STATE_ESTABLISHED in this
1493             // case, until close() is actually called?  I can't think of a
1494             // reason why we would need to do so.  No other operations besides
1495             // calling close() or destroying the socket can be performed at
1496             // this point.
1497             assert(readCallback_ == nullptr);
1498             state_ = StateEnum::CLOSED;
1499             if (fd_ >= 0) {
1500               ioHandler_.changeHandlerFD(-1);
1501               doClose();
1502             }
1503           } else {
1504             // Reads are still enabled, so we are only doing a half-shutdown
1505             ::shutdown(fd_, SHUT_WR);
1506           }
1507         }
1508       }
1509
1510       // Invoke the callback
1511       WriteCallback* callback = req->getCallback();
1512       req->destroy();
1513       if (callback) {
1514         callback->writeSuccess();
1515       }
1516       // We'll continue around the loop, trying to write another request
1517     } else {
1518       // Notify BufferCallback:
1519       BufferCallback* bufferCallback = writeReqHead_->getBufferCallback();
1520       if (bufferCallback) {
1521         bufferCallback->onEgressBuffered();
1522       }
1523       // Partial write.
1524       writeReqHead_->consume();
1525       // Stop after a partial write; it's highly likely that a subsequent write
1526       // attempt will just return EAGAIN.
1527       //
1528       // Ensure that we are registered for write events.
1529       if ((eventFlags_ & EventHandler::WRITE) == 0) {
1530         if (!updateEventRegistration(EventHandler::WRITE, 0)) {
1531           assert(state_ == StateEnum::ERROR);
1532           return;
1533         }
1534       }
1535
1536       // Reschedule the send timeout, since we have made some write progress.
1537       if (sendTimeout_ > 0) {
1538         if (!writeTimeout_.scheduleTimeout(sendTimeout_)) {
1539           AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1540               withAddr("failed to reschedule write timeout"));
1541           return failWrite(__func__, ex);
1542         }
1543       }
1544       return;
1545     }
1546   }
1547 }
1548
1549 void AsyncSocket::checkForImmediateRead() noexcept {
1550   // We currently don't attempt to perform optimistic reads in AsyncSocket.
1551   // (However, note that some subclasses do override this method.)
1552   //
1553   // Simply calling handleRead() here would be bad, as this would call
1554   // readCallback_->getReadBuffer(), forcing the callback to allocate a read
1555   // buffer even though no data may be available.  This would waste lots of
1556   // memory, since the buffer will sit around unused until the socket actually
1557   // becomes readable.
1558   //
1559   // Checking if the socket is readable now also seems like it would probably
1560   // be a pessimism.  In most cases it probably wouldn't be readable, and we
1561   // would just waste an extra system call.  Even if it is readable, waiting to
1562   // find out from libevent on the next event loop doesn't seem that bad.
1563 }
1564
1565 void AsyncSocket::handleInitialReadWrite() noexcept {
1566   // Our callers should already be holding a DestructorGuard, but grab
1567   // one here just to make sure, in case one of our calling code paths ever
1568   // changes.
1569   DestructorGuard dg(this);
1570
1571   // If we have a readCallback_, make sure we enable read events.  We
1572   // may already be registered for reads if connectSuccess() set
1573   // the read calback.
1574   if (readCallback_ && !(eventFlags_ & EventHandler::READ)) {
1575     assert(state_ == StateEnum::ESTABLISHED);
1576     assert((shutdownFlags_ & SHUT_READ) == 0);
1577     if (!updateEventRegistration(EventHandler::READ, 0)) {
1578       assert(state_ == StateEnum::ERROR);
1579       return;
1580     }
1581     checkForImmediateRead();
1582   } else if (readCallback_ == nullptr) {
1583     // Unregister for read events.
1584     updateEventRegistration(0, EventHandler::READ);
1585   }
1586
1587   // If we have write requests pending, try to send them immediately.
1588   // Since we just finished accepting, there is a very good chance that we can
1589   // write without blocking.
1590   //
1591   // However, we only process them if EventHandler::WRITE is not already set,
1592   // which means that we're already blocked on a write attempt.  (This can
1593   // happen if connectSuccess() called write() before returning.)
1594   if (writeReqHead_ && !(eventFlags_ & EventHandler::WRITE)) {
1595     // Call handleWrite() to perform write processing.
1596     handleWrite();
1597   } else if (writeReqHead_ == nullptr) {
1598     // Unregister for write event.
1599     updateEventRegistration(0, EventHandler::WRITE);
1600   }
1601 }
1602
1603 void AsyncSocket::handleConnect() noexcept {
1604   VLOG(5) << "AsyncSocket::handleConnect() this=" << this << ", fd=" << fd_
1605           << ", state=" << state_;
1606   assert(state_ == StateEnum::CONNECTING);
1607   // SHUT_WRITE can never be set while we are still connecting;
1608   // SHUT_WRITE_PENDING may be set, be we only set SHUT_WRITE once the connect
1609   // finishes
1610   assert((shutdownFlags_ & SHUT_WRITE) == 0);
1611
1612   // In case we had a connect timeout, cancel the timeout
1613   writeTimeout_.cancelTimeout();
1614   // We don't use a persistent registration when waiting on a connect event,
1615   // so we have been automatically unregistered now.  Update eventFlags_ to
1616   // reflect reality.
1617   assert(eventFlags_ == EventHandler::WRITE);
1618   eventFlags_ = EventHandler::NONE;
1619
1620   // Call getsockopt() to check if the connect succeeded
1621   int error;
1622   socklen_t len = sizeof(error);
1623   int rv = getsockopt(fd_, SOL_SOCKET, SO_ERROR, &error, &len);
1624   if (rv != 0) {
1625     AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1626                            withAddr("error calling getsockopt() after connect"),
1627                            errno);
1628     VLOG(4) << "AsyncSocket::handleConnect(this=" << this << ", fd="
1629                << fd_ << " host=" << addr_.describe()
1630                << ") exception:" << ex.what();
1631     return failConnect(__func__, ex);
1632   }
1633
1634   if (error != 0) {
1635     AsyncSocketException ex(AsyncSocketException::NOT_OPEN,
1636                            "connect failed", error);
1637     VLOG(1) << "AsyncSocket::handleConnect(this=" << this << ", fd="
1638             << fd_ << " host=" << addr_.describe()
1639             << ") exception: " << ex.what();
1640     return failConnect(__func__, ex);
1641   }
1642
1643   // Move into STATE_ESTABLISHED
1644   state_ = StateEnum::ESTABLISHED;
1645
1646   // If SHUT_WRITE_PENDING is set and we don't have any write requests to
1647   // perform, immediately shutdown the write half of the socket.
1648   if ((shutdownFlags_ & SHUT_WRITE_PENDING) && writeReqHead_ == nullptr) {
1649     // SHUT_READ shouldn't be set.  If close() is called on the socket while we
1650     // are still connecting we just abort the connect rather than waiting for
1651     // it to complete.
1652     assert((shutdownFlags_ & SHUT_READ) == 0);
1653     ::shutdown(fd_, SHUT_WR);
1654     shutdownFlags_ |= SHUT_WRITE;
1655   }
1656
1657   VLOG(7) << "AsyncSocket " << this << ": fd " << fd_
1658           << "successfully connected; state=" << state_;
1659
1660   // Remember the EventBase we are attached to, before we start invoking any
1661   // callbacks (since the callbacks may call detachEventBase()).
1662   EventBase* originalEventBase = eventBase_;
1663
1664   invokeConnectSuccess();
1665   // Note that the connect callback may have changed our state.
1666   // (set or unset the read callback, called write(), closed the socket, etc.)
1667   // The following code needs to handle these situations correctly.
1668   //
1669   // If the socket has been closed, readCallback_ and writeReqHead_ will
1670   // always be nullptr, so that will prevent us from trying to read or write.
1671   //
1672   // The main thing to check for is if eventBase_ is still originalEventBase.
1673   // If not, we have been detached from this event base, so we shouldn't
1674   // perform any more operations.
1675   if (eventBase_ != originalEventBase) {
1676     return;
1677   }
1678
1679   handleInitialReadWrite();
1680 }
1681
1682 void AsyncSocket::timeoutExpired() noexcept {
1683   VLOG(7) << "AsyncSocket " << this << ", fd " << fd_ << ": timeout expired: "
1684           << "state=" << state_ << ", events=" << std::hex << eventFlags_;
1685   DestructorGuard dg(this);
1686   assert(eventBase_->isInEventBaseThread());
1687
1688   if (state_ == StateEnum::CONNECTING) {
1689     // connect() timed out
1690     // Unregister for I/O events.
1691     AsyncSocketException ex(AsyncSocketException::TIMED_OUT,
1692                            "connect timed out");
1693     failConnect(__func__, ex);
1694   } else {
1695     // a normal write operation timed out
1696     assert(state_ == StateEnum::ESTABLISHED);
1697     AsyncSocketException ex(AsyncSocketException::TIMED_OUT, "write timed out");
1698     failWrite(__func__, ex);
1699   }
1700 }
1701
1702 ssize_t AsyncSocket::performWrite(const iovec* vec,
1703                                    uint32_t count,
1704                                    WriteFlags flags,
1705                                    uint32_t* countWritten,
1706                                    uint32_t* partialWritten) {
1707   // We use sendmsg() instead of writev() so that we can pass in MSG_NOSIGNAL
1708   // We correctly handle EPIPE errors, so we never want to receive SIGPIPE
1709   // (since it may terminate the program if the main program doesn't explicitly
1710   // ignore it).
1711   struct msghdr msg;
1712   msg.msg_name = nullptr;
1713   msg.msg_namelen = 0;
1714   msg.msg_iov = const_cast<iovec *>(vec);
1715 #ifdef IOV_MAX // not defined on Android
1716   msg.msg_iovlen = std::min(count, (uint32_t)IOV_MAX);
1717 #else
1718   msg.msg_iovlen = std::min(count, (uint32_t)UIO_MAXIOV);
1719 #endif
1720   msg.msg_control = nullptr;
1721   msg.msg_controllen = 0;
1722   msg.msg_flags = 0;
1723
1724   int msg_flags = MSG_DONTWAIT;
1725
1726 #ifdef MSG_NOSIGNAL // Linux-only
1727   msg_flags |= MSG_NOSIGNAL;
1728   if (isSet(flags, WriteFlags::CORK)) {
1729     // MSG_MORE tells the kernel we have more data to send, so wait for us to
1730     // give it the rest of the data rather than immediately sending a partial
1731     // frame, even when TCP_NODELAY is enabled.
1732     msg_flags |= MSG_MORE;
1733   }
1734 #endif
1735   if (isSet(flags, WriteFlags::EOR)) {
1736     // marks that this is the last byte of a record (response)
1737     msg_flags |= MSG_EOR;
1738   }
1739   ssize_t totalWritten = ::sendmsg(fd_, &msg, msg_flags);
1740   if (totalWritten < 0) {
1741     if (errno == EAGAIN) {
1742       // TCP buffer is full; we can't write any more data right now.
1743       *countWritten = 0;
1744       *partialWritten = 0;
1745       return 0;
1746     }
1747     // error
1748     *countWritten = 0;
1749     *partialWritten = 0;
1750     return -1;
1751   }
1752
1753   appBytesWritten_ += totalWritten;
1754
1755   uint32_t bytesWritten;
1756   uint32_t n;
1757   for (bytesWritten = totalWritten, n = 0; n < count; ++n) {
1758     const iovec* v = vec + n;
1759     if (v->iov_len > bytesWritten) {
1760       // Partial write finished in the middle of this iovec
1761       *countWritten = n;
1762       *partialWritten = bytesWritten;
1763       return totalWritten;
1764     }
1765
1766     bytesWritten -= v->iov_len;
1767   }
1768
1769   assert(bytesWritten == 0);
1770   *countWritten = n;
1771   *partialWritten = 0;
1772   return totalWritten;
1773 }
1774
1775 /**
1776  * Re-register the EventHandler after eventFlags_ has changed.
1777  *
1778  * If an error occurs, fail() is called to move the socket into the error state
1779  * and call all currently installed callbacks.  After an error, the
1780  * AsyncSocket is completely unregistered.
1781  *
1782  * @return Returns true on succcess, or false on error.
1783  */
1784 bool AsyncSocket::updateEventRegistration() {
1785   VLOG(5) << "AsyncSocket::updateEventRegistration(this=" << this
1786           << ", fd=" << fd_ << ", evb=" << eventBase_ << ", state=" << state_
1787           << ", events=" << std::hex << eventFlags_;
1788   assert(eventBase_->isInEventBaseThread());
1789   if (eventFlags_ == EventHandler::NONE) {
1790     ioHandler_.unregisterHandler();
1791     return true;
1792   }
1793
1794   // Always register for persistent events, so we don't have to re-register
1795   // after being called back.
1796   if (!ioHandler_.registerHandler(eventFlags_ | EventHandler::PERSIST)) {
1797     eventFlags_ = EventHandler::NONE; // we're not registered after error
1798     AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1799         withAddr("failed to update AsyncSocket event registration"));
1800     fail("updateEventRegistration", ex);
1801     return false;
1802   }
1803
1804   return true;
1805 }
1806
1807 bool AsyncSocket::updateEventRegistration(uint16_t enable,
1808                                            uint16_t disable) {
1809   uint16_t oldFlags = eventFlags_;
1810   eventFlags_ |= enable;
1811   eventFlags_ &= ~disable;
1812   if (eventFlags_ == oldFlags) {
1813     return true;
1814   } else {
1815     return updateEventRegistration();
1816   }
1817 }
1818
1819 void AsyncSocket::startFail() {
1820   // startFail() should only be called once
1821   assert(state_ != StateEnum::ERROR);
1822   assert(getDestructorGuardCount() > 0);
1823   state_ = StateEnum::ERROR;
1824   // Ensure that SHUT_READ and SHUT_WRITE are set,
1825   // so all future attempts to read or write will be rejected
1826   shutdownFlags_ |= (SHUT_READ | SHUT_WRITE);
1827
1828   if (eventFlags_ != EventHandler::NONE) {
1829     eventFlags_ = EventHandler::NONE;
1830     ioHandler_.unregisterHandler();
1831   }
1832   writeTimeout_.cancelTimeout();
1833
1834   if (fd_ >= 0) {
1835     ioHandler_.changeHandlerFD(-1);
1836     doClose();
1837   }
1838 }
1839
1840 void AsyncSocket::finishFail() {
1841   assert(state_ == StateEnum::ERROR);
1842   assert(getDestructorGuardCount() > 0);
1843
1844   AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
1845                          withAddr("socket closing after error"));
1846   invokeConnectErr(ex);
1847   failAllWrites(ex);
1848
1849   if (readCallback_) {
1850     ReadCallback* callback = readCallback_;
1851     readCallback_ = nullptr;
1852     callback->readErr(ex);
1853   }
1854 }
1855
1856 void AsyncSocket::fail(const char* fn, const AsyncSocketException& ex) {
1857   VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
1858              << state_ << " host=" << addr_.describe()
1859              << "): failed in " << fn << "(): "
1860              << ex.what();
1861   startFail();
1862   finishFail();
1863 }
1864
1865 void AsyncSocket::failConnect(const char* fn, const AsyncSocketException& ex) {
1866   VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
1867                << state_ << " host=" << addr_.describe()
1868                << "): failed while connecting in " << fn << "(): "
1869                << ex.what();
1870   startFail();
1871
1872   invokeConnectErr(ex);
1873   finishFail();
1874 }
1875
1876 void AsyncSocket::failRead(const char* fn, const AsyncSocketException& ex) {
1877   VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
1878                << state_ << " host=" << addr_.describe()
1879                << "): failed while reading in " << fn << "(): "
1880                << ex.what();
1881   startFail();
1882
1883   if (readCallback_ != nullptr) {
1884     ReadCallback* callback = readCallback_;
1885     readCallback_ = nullptr;
1886     callback->readErr(ex);
1887   }
1888
1889   finishFail();
1890 }
1891
1892 void AsyncSocket::failWrite(const char* fn, const AsyncSocketException& ex) {
1893   VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
1894                << state_ << " host=" << addr_.describe()
1895                << "): failed while writing in " << fn << "(): "
1896                << ex.what();
1897   startFail();
1898
1899   // Only invoke the first write callback, since the error occurred while
1900   // writing this request.  Let any other pending write callbacks be invoked in
1901   // finishFail().
1902   if (writeReqHead_ != nullptr) {
1903     WriteRequest* req = writeReqHead_;
1904     writeReqHead_ = req->getNext();
1905     WriteCallback* callback = req->getCallback();
1906     uint32_t bytesWritten = req->getTotalBytesWritten();
1907     req->destroy();
1908     if (callback) {
1909       callback->writeErr(bytesWritten, ex);
1910     }
1911   }
1912
1913   finishFail();
1914 }
1915
1916 void AsyncSocket::failWrite(const char* fn, WriteCallback* callback,
1917                              size_t bytesWritten,
1918                              const AsyncSocketException& ex) {
1919   // This version of failWrite() is used when the failure occurs before
1920   // we've added the callback to writeReqHead_.
1921   VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
1922              << state_ << " host=" << addr_.describe()
1923              <<"): failed while writing in " << fn << "(): "
1924              << ex.what();
1925   startFail();
1926
1927   if (callback != nullptr) {
1928     callback->writeErr(bytesWritten, ex);
1929   }
1930
1931   finishFail();
1932 }
1933
1934 void AsyncSocket::failAllWrites(const AsyncSocketException& ex) {
1935   // Invoke writeError() on all write callbacks.
1936   // This is used when writes are forcibly shutdown with write requests
1937   // pending, or when an error occurs with writes pending.
1938   while (writeReqHead_ != nullptr) {
1939     WriteRequest* req = writeReqHead_;
1940     writeReqHead_ = req->getNext();
1941     WriteCallback* callback = req->getCallback();
1942     if (callback) {
1943       callback->writeErr(req->getTotalBytesWritten(), ex);
1944     }
1945     req->destroy();
1946   }
1947 }
1948
1949 void AsyncSocket::invalidState(ConnectCallback* callback) {
1950   VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_
1951              << "): connect() called in invalid state " << state_;
1952
1953   /*
1954    * The invalidState() methods don't use the normal failure mechanisms,
1955    * since we don't know what state we are in.  We don't want to call
1956    * startFail()/finishFail() recursively if we are already in the middle of
1957    * cleaning up.
1958    */
1959
1960   AsyncSocketException ex(AsyncSocketException::ALREADY_OPEN,
1961                          "connect() called with socket in invalid state");
1962   connectEndTime_ = std::chrono::steady_clock::now();
1963   if (state_ == StateEnum::CLOSED || state_ == StateEnum::ERROR) {
1964     if (callback) {
1965       callback->connectErr(ex);
1966     }
1967   } else {
1968     // We can't use failConnect() here since connectCallback_
1969     // may already be set to another callback.  Invoke this ConnectCallback
1970     // here; any other connectCallback_ will be invoked in finishFail()
1971     startFail();
1972     if (callback) {
1973       callback->connectErr(ex);
1974     }
1975     finishFail();
1976   }
1977 }
1978
1979 void AsyncSocket::invokeConnectErr(const AsyncSocketException& ex) {
1980   connectEndTime_ = std::chrono::steady_clock::now();
1981   if (connectCallback_) {
1982     ConnectCallback* callback = connectCallback_;
1983     connectCallback_ = nullptr;
1984     callback->connectErr(ex);
1985   }
1986 }
1987
1988 void AsyncSocket::invokeConnectSuccess() {
1989   connectEndTime_ = std::chrono::steady_clock::now();
1990   if (connectCallback_) {
1991     ConnectCallback* callback = connectCallback_;
1992     connectCallback_ = nullptr;
1993     callback->connectSuccess();
1994   }
1995 }
1996
1997 void AsyncSocket::invalidState(ReadCallback* callback) {
1998   VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_
1999              << "): setReadCallback(" << callback
2000              << ") called in invalid state " << state_;
2001
2002   AsyncSocketException ex(AsyncSocketException::NOT_OPEN,
2003                          "setReadCallback() called with socket in "
2004                          "invalid state");
2005   if (state_ == StateEnum::CLOSED || state_ == StateEnum::ERROR) {
2006     if (callback) {
2007       callback->readErr(ex);
2008     }
2009   } else {
2010     startFail();
2011     if (callback) {
2012       callback->readErr(ex);
2013     }
2014     finishFail();
2015   }
2016 }
2017
2018 void AsyncSocket::invalidState(WriteCallback* callback) {
2019   VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_
2020              << "): write() called in invalid state " << state_;
2021
2022   AsyncSocketException ex(AsyncSocketException::NOT_OPEN,
2023                          withAddr("write() called with socket in invalid state"));
2024   if (state_ == StateEnum::CLOSED || state_ == StateEnum::ERROR) {
2025     if (callback) {
2026       callback->writeErr(0, ex);
2027     }
2028   } else {
2029     startFail();
2030     if (callback) {
2031       callback->writeErr(0, ex);
2032     }
2033     finishFail();
2034   }
2035 }
2036
2037 void AsyncSocket::doClose() {
2038   if (fd_ == -1) return;
2039   if (shutdownSocketSet_) {
2040     shutdownSocketSet_->close(fd_);
2041   } else {
2042     ::close(fd_);
2043   }
2044   fd_ = -1;
2045 }
2046
2047 std::ostream& operator << (std::ostream& os,
2048                            const AsyncSocket::StateEnum& state) {
2049   os << static_cast<int>(state);
2050   return os;
2051 }
2052
2053 std::string AsyncSocket::withAddr(const std::string& s) {
2054   // Don't use addr_ directly because it may not be initialized
2055   // e.g. if constructed from fd
2056   folly::SocketAddress peer, local;
2057   try {
2058     getPeerAddress(&peer);
2059     getLocalAddress(&local);
2060   } catch (const std::exception&) {
2061     // ignore
2062   } catch (...) {
2063     // ignore
2064   }
2065   return s + " (peer=" + peer.describe() + ", local=" + local.describe() + ")";
2066 }
2067
2068 } // folly