2 * Copyright 2014-present Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
21 #include <folly/io/IOBuf.h>
22 #include <folly/io/async/AsyncSocketBase.h>
23 #include <folly/io/async/DelayedDestruction.h>
24 #include <folly/io/async/EventBase.h>
25 #include <folly/portability/OpenSSL.h>
26 #include <folly/portability/SysUio.h>
27 #include <folly/ssl/OpenSSLPtrTypes.h>
29 constexpr bool kOpenSslModeMoveBufferOwnership =
30 #ifdef SSL_MODE_MOVE_BUFFER_OWNERSHIP
39 class AsyncSocketException;
44 * flags given by the application for write* calls
46 enum class WriteFlags : uint32_t {
49 * Whether to delay the output until a subsequent non-corked write.
50 * (Note: may not be supported in all subclasses or on all platforms.)
54 * for a socket that has ACK latency enabled, it will cause the kernel
55 * to fire a TCP ESTATS event when the last byte of the given write call
56 * will be acknowledged.
60 * this indicates that only the write side of socket should be shutdown
62 WRITE_SHUTDOWN = 0x04,
64 * use msg zerocopy if allowed
66 WRITE_MSG_ZEROCOPY = 0x08,
72 inline WriteFlags operator|(WriteFlags a, WriteFlags b) {
73 return static_cast<WriteFlags>(
74 static_cast<uint32_t>(a) | static_cast<uint32_t>(b));
78 * compound assignment union operator
80 inline WriteFlags& operator|=(WriteFlags& a, WriteFlags b) {
86 * intersection operator
88 inline WriteFlags operator&(WriteFlags a, WriteFlags b) {
89 return static_cast<WriteFlags>(
90 static_cast<uint32_t>(a) & static_cast<uint32_t>(b));
94 * compound assignment intersection operator
96 inline WriteFlags& operator&=(WriteFlags& a, WriteFlags b) {
102 * exclusion parameter
104 inline WriteFlags operator~(WriteFlags a) {
105 return static_cast<WriteFlags>(~static_cast<uint32_t>(a));
111 inline WriteFlags unSet(WriteFlags a, WriteFlags b) {
118 inline bool isSet(WriteFlags a, WriteFlags b) {
124 * AsyncTransport defines an asynchronous API for streaming I/O.
126 * This class provides an API to for asynchronously waiting for data
127 * on a streaming transport, and for asynchronously sending data.
129 * The APIs for reading and writing are intentionally asymmetric. Waiting for
130 * data to read is a persistent API: a callback is installed, and is notified
131 * whenever new data is available. It continues to be notified of new events
132 * until it is uninstalled.
134 * AsyncTransport does not provide read timeout functionality, because it
135 * typically cannot determine when the timeout should be active. Generally, a
136 * timeout should only be enabled when processing is blocked waiting on data
137 * from the remote endpoint. For server-side applications, the timeout should
138 * not be active if the server is currently processing one or more outstanding
139 * requests on this transport. For client-side applications, the timeout
140 * should not be active if there are no requests pending on the transport.
141 * Additionally, if a client has multiple pending requests, it will ususally
142 * want a separate timeout for each request, rather than a single read timeout.
144 * The write API is fairly intuitive: a user can request to send a block of
145 * data, and a callback will be informed once the entire block has been
146 * transferred to the kernel, or on error. AsyncTransport does provide a send
147 * timeout, since most callers want to give up if the remote end stops
148 * responding and no further progress can be made sending the data.
150 class AsyncTransport : public DelayedDestruction, public AsyncSocketBase {
152 typedef std::unique_ptr<AsyncTransport, Destructor> UniquePtr;
155 * Close the transport.
157 * This gracefully closes the transport, waiting for all pending write
158 * requests to complete before actually closing the underlying transport.
160 * If a read callback is set, readEOF() will be called immediately. If there
161 * are outstanding write requests, the close will be delayed until all
162 * remaining writes have completed. No new writes may be started after
163 * close() has been called.
165 virtual void close() = 0;
168 * Close the transport immediately.
170 * This closes the transport immediately, dropping any outstanding data
171 * waiting to be written.
173 * If a read callback is set, readEOF() will be called immediately.
174 * If there are outstanding write requests, these requests will be aborted
175 * and writeError() will be invoked immediately on all outstanding write
178 virtual void closeNow() = 0;
181 * Reset the transport immediately.
183 * This closes the transport immediately, sending a reset to the remote peer
184 * if possible to indicate abnormal shutdown.
186 * Note that not all subclasses implement this reset functionality: some
187 * subclasses may treat reset() the same as closeNow(). Subclasses that use
188 * TCP transports should terminate the connection with a TCP reset.
190 virtual void closeWithReset() {
195 * Perform a half-shutdown of the write side of the transport.
197 * The caller should not make any more calls to write() or writev() after
198 * shutdownWrite() is called. Any future write attempts will fail
201 * Not all transport types support half-shutdown. If the underlying
202 * transport does not support half-shutdown, it will fully shutdown both the
203 * read and write sides of the transport. (Fully shutting down the socket is
204 * better than doing nothing at all, since the caller may rely on the
205 * shutdownWrite() call to notify the other end of the connection that no
206 * more data can be read.)
208 * If there is pending data still waiting to be written on the transport,
209 * the actual shutdown will be delayed until the pending data has been
212 * Note: There is no corresponding shutdownRead() equivalent. Simply
213 * uninstall the read callback if you wish to stop reading. (On TCP sockets
214 * at least, shutting down the read side of the socket is a no-op anyway.)
216 virtual void shutdownWrite() = 0;
219 * Perform a half-shutdown of the write side of the transport.
221 * shutdownWriteNow() is identical to shutdownWrite(), except that it
222 * immediately performs the shutdown, rather than waiting for pending writes
223 * to complete. Any pending write requests will be immediately failed when
224 * shutdownWriteNow() is called.
226 virtual void shutdownWriteNow() = 0;
229 * Determine if transport is open and ready to read or write.
231 * Note that this function returns false on EOF; you must also call error()
232 * to distinguish between an EOF and an error.
234 * @return true iff the transport is open and ready, false otherwise.
236 virtual bool good() const = 0;
239 * Determine if the transport is readable or not.
241 * @return true iff the transport is readable, false otherwise.
243 virtual bool readable() const = 0;
246 * Determine if the transport is writable or not.
248 * @return true iff the transport is writable, false otherwise.
250 virtual bool writable() const {
251 // By default return good() - leave it to implementers to override.
256 * Determine if the there is pending data on the transport.
258 * @return true iff the if the there is pending data, false otherwise.
260 virtual bool isPending() const {
265 * Determine if transport is connected to the endpoint
267 * @return false iff the transport is connected, otherwise true
269 virtual bool connecting() const = 0;
272 * Determine if an error has occurred with this transport.
274 * @return true iff an error has occurred (not EOF).
276 virtual bool error() const = 0;
279 * Attach the transport to a EventBase.
281 * This may only be called if the transport is not currently attached to a
282 * EventBase (by an earlier call to detachEventBase()).
284 * This method must be invoked in the EventBase's thread.
286 virtual void attachEventBase(EventBase* eventBase) = 0;
289 * Detach the transport from its EventBase.
291 * This may only be called when the transport is idle and has no reads or
292 * writes pending. Once detached, the transport may not be used again until
293 * it is re-attached to a EventBase by calling attachEventBase().
295 * This method must be called from the current EventBase's thread.
297 virtual void detachEventBase() = 0;
300 * Determine if the transport can be detached.
302 * This method must be called from the current EventBase's thread.
304 virtual bool isDetachable() const = 0;
307 * Set the send timeout.
309 * If write requests do not make any progress for more than the specified
310 * number of milliseconds, fail all pending writes and close the transport.
312 * If write requests are currently pending when setSendTimeout() is called,
313 * the timeout interval is immediately restarted using the new value.
315 * @param milliseconds The timeout duration, in milliseconds. If 0, no
316 * timeout will be used.
318 virtual void setSendTimeout(uint32_t milliseconds) = 0;
321 * Get the send timeout.
323 * @return Returns the current send timeout, in milliseconds. A return value
324 * of 0 indicates that no timeout is set.
326 virtual uint32_t getSendTimeout() const = 0;
329 * Get the address of the local endpoint of this transport.
331 * This function may throw AsyncSocketException on error.
333 * @param address The local address will be stored in the specified
336 virtual void getLocalAddress(SocketAddress* address) const = 0;
339 * Get the address of the remote endpoint to which this transport is
342 * This function may throw AsyncSocketException on error.
344 * @return Return the local address
346 SocketAddress getLocalAddress() const {
348 getLocalAddress(&addr);
352 void getAddress(SocketAddress* address) const override {
353 getLocalAddress(address);
357 * Get the address of the remote endpoint to which this transport is
360 * This function may throw AsyncSocketException on error.
362 * @param address The remote endpoint's address will be stored in the
363 * specified SocketAddress.
365 virtual void getPeerAddress(SocketAddress* address) const = 0;
368 * Get the address of the remote endpoint to which this transport is
371 * This function may throw AsyncSocketException on error.
373 * @return Return the remote endpoint's address
375 SocketAddress getPeerAddress() const {
377 getPeerAddress(&addr);
382 * Get the certificate used to authenticate the peer.
384 virtual ssl::X509UniquePtr getPeerCert() const { return nullptr; }
387 * The local certificate used for this connection. May be null
389 virtual const X509* getSelfCert() const {
394 * Return the application protocol being used by the underlying transport
395 * protocol. This is useful for transports which are used to tunnel other
398 virtual std::string getApplicationProtocol() noexcept {
403 * Returns the name of the security protocol being used.
405 virtual std::string getSecurityProtocol() const {
410 * @return True iff end of record tracking is enabled
412 virtual bool isEorTrackingEnabled() const = 0;
414 virtual void setEorTracking(bool track) = 0;
416 virtual size_t getAppBytesWritten() const = 0;
417 virtual size_t getRawBytesWritten() const = 0;
418 virtual size_t getAppBytesReceived() const = 0;
419 virtual size_t getRawBytesReceived() const = 0;
421 class BufferCallback {
423 virtual ~BufferCallback() {}
424 virtual void onEgressBuffered() = 0;
425 virtual void onEgressBufferCleared() = 0;
429 * Callback class to signal when a transport that did not have replay
430 * protection gains replay protection. This is needed for 0-RTT security
433 class ReplaySafetyCallback {
435 virtual ~ReplaySafetyCallback() = default;
438 * Called when the transport becomes replay safe.
440 virtual void onReplaySafe() = 0;
444 * False if the transport does not have replay protection, but will in the
447 virtual bool isReplaySafe() const { return true; }
450 * Set the ReplaySafeCallback on this transport.
452 * This should only be called if isReplaySafe() returns false.
454 virtual void setReplaySafetyCallback(ReplaySafetyCallback* callback) {
456 CHECK(false) << "setReplaySafetyCallback() not supported";
461 ~AsyncTransport() override = default;
468 virtual ~ReadCallback() = default;
471 * When data becomes available, getReadBuffer() will be invoked to get the
472 * buffer into which data should be read.
474 * This method allows the ReadCallback to delay buffer allocation until
475 * data becomes available. This allows applications to manage large
476 * numbers of idle connections, without having to maintain a separate read
477 * buffer for each idle connection.
479 * It is possible that in some cases, getReadBuffer() may be called
480 * multiple times before readDataAvailable() is invoked. In this case, the
481 * data will be written to the buffer returned from the most recent call to
482 * readDataAvailable(). If the previous calls to readDataAvailable()
483 * returned different buffers, the ReadCallback is responsible for ensuring
484 * that they are not leaked.
486 * If getReadBuffer() throws an exception, returns a nullptr buffer, or
487 * returns a 0 length, the ReadCallback will be uninstalled and its
488 * readError() method will be invoked.
490 * getReadBuffer() is not allowed to change the transport state before it
491 * returns. (For example, it should never uninstall the read callback, or
492 * set a different read callback.)
494 * @param bufReturn getReadBuffer() should update *bufReturn to contain the
495 * address of the read buffer. This parameter will never
497 * @param lenReturn getReadBuffer() should update *lenReturn to contain the
498 * maximum number of bytes that may be written to the read
499 * buffer. This parameter will never be nullptr.
501 virtual void getReadBuffer(void** bufReturn, size_t* lenReturn) = 0;
504 * readDataAvailable() will be invoked when data has been successfully read
505 * into the buffer returned by the last call to getReadBuffer().
507 * The read callback remains installed after readDataAvailable() returns.
508 * It must be explicitly uninstalled to stop receiving read events.
509 * getReadBuffer() will be called at least once before each call to
510 * readDataAvailable(). getReadBuffer() will also be called before any
513 * @param len The number of bytes placed in the buffer.
516 virtual void readDataAvailable(size_t len) noexcept = 0;
519 * When data becomes available, isBufferMovable() will be invoked to figure
520 * out which API will be used, readBufferAvailable() or
521 * readDataAvailable(). If isBufferMovable() returns true, that means
522 * ReadCallback supports the IOBuf ownership transfer and
523 * readBufferAvailable() will be used. Otherwise, not.
525 * By default, isBufferMovable() always return false. If
526 * readBufferAvailable() is implemented and to be invoked, You should
527 * overwrite isBufferMovable() and return true in the inherited class.
529 * This method allows the AsyncSocket/AsyncSSLSocket do buffer allocation by
530 * itself until data becomes available. Compared with the pre/post buffer
531 * allocation in getReadBuffer()/readDataAvailabe(), readBufferAvailable()
532 * has two advantages. First, this can avoid memcpy. E.g., in
533 * AsyncSSLSocket, the decrypted data was copied from the openssl internal
534 * buffer to the readbuf buffer. With the buffer ownership transfer, the
535 * internal buffer can be directly "moved" to ReadCallback. Second, the
536 * memory allocation can be more precise. The reason is
537 * AsyncSocket/AsyncSSLSocket can allocate the memory of precise size
538 * because they have more context about the available data than
539 * ReadCallback. Think about the getReadBuffer() pre-allocate 4072 bytes
540 * buffer, but the available data is always 16KB (max OpenSSL record size).
543 virtual bool isBufferMovable() noexcept {
548 * Suggested buffer size, allocated for read operations,
549 * if callback is movable and supports folly::IOBuf
552 virtual size_t maxBufferSize() const {
553 return 64 * 1024; // 64K
557 * readBufferAvailable() will be invoked when data has been successfully
560 * Note that only either readBufferAvailable() or readDataAvailable() will
561 * be invoked according to the return value of isBufferMovable(). The timing
562 * and aftereffect of readBufferAvailable() are the same as
563 * readDataAvailable()
565 * @param readBuf The unique pointer of read buffer.
568 virtual void readBufferAvailable(std::unique_ptr<IOBuf> /*readBuf*/)
572 * readEOF() will be invoked when the transport is closed.
574 * The read callback will be automatically uninstalled immediately before
575 * readEOF() is invoked.
577 virtual void readEOF() noexcept = 0;
580 * readError() will be invoked if an error occurs reading from the
583 * The read callback will be automatically uninstalled immediately before
584 * readError() is invoked.
586 * @param ex An exception describing the error that occurred.
588 virtual void readErr(const AsyncSocketException& ex) noexcept = 0;
591 // Read methods that aren't part of AsyncTransport.
592 virtual void setReadCB(ReadCallback* callback) = 0;
593 virtual ReadCallback* getReadCallback() const = 0;
596 virtual ~AsyncReader() = default;
601 class WriteCallback {
603 virtual ~WriteCallback() = default;
606 * writeSuccess() will be invoked when all of the data has been
607 * successfully written.
609 * Note that this mainly signals that the buffer containing the data to
610 * write is no longer needed and may be freed or re-used. It does not
611 * guarantee that the data has been fully transmitted to the remote
612 * endpoint. For example, on socket-based transports, writeSuccess() only
613 * indicates that the data has been given to the kernel for eventual
616 virtual void writeSuccess() noexcept = 0;
619 * writeError() will be invoked if an error occurs writing the data.
621 * @param bytesWritten The number of bytes that were successfull
622 * @param ex An exception describing the error that occurred.
624 virtual void writeErr(size_t bytesWritten,
625 const AsyncSocketException& ex) noexcept = 0;
628 // Write methods that aren't part of AsyncTransport
629 virtual void write(WriteCallback* callback, const void* buf, size_t bytes,
630 WriteFlags flags = WriteFlags::NONE) = 0;
631 virtual void writev(WriteCallback* callback, const iovec* vec, size_t count,
632 WriteFlags flags = WriteFlags::NONE) = 0;
633 virtual void writeChain(WriteCallback* callback,
634 std::unique_ptr<IOBuf>&& buf,
635 WriteFlags flags = WriteFlags::NONE) = 0;
638 virtual ~AsyncWriter() = default;
641 // Transitional intermediate interface. This is deprecated.
642 // Wrapper around folly::AsyncTransport, that includes read/write callbacks
643 class AsyncTransportWrapper : virtual public AsyncTransport,
644 virtual public AsyncReader,
645 virtual public AsyncWriter {
647 using UniquePtr = std::unique_ptr<AsyncTransportWrapper, Destructor>;
649 // Alias for inherited members from AsyncReader and AsyncWriter
650 // to keep compatibility.
651 using ReadCallback = AsyncReader::ReadCallback;
652 using WriteCallback = AsyncWriter::WriteCallback;
653 void setReadCB(ReadCallback* callback) override = 0;
654 ReadCallback* getReadCallback() const override = 0;
656 WriteCallback* callback,
659 WriteFlags flags = WriteFlags::NONE) override = 0;
661 WriteCallback* callback,
664 WriteFlags flags = WriteFlags::NONE) override = 0;
666 WriteCallback* callback,
667 std::unique_ptr<IOBuf>&& buf,
668 WriteFlags flags = WriteFlags::NONE) override = 0;
670 * The transport wrapper may wrap another transport. This returns the
671 * transport that is wrapped. It returns nullptr if there is no wrapped
674 virtual const AsyncTransportWrapper* getWrappedTransport() const {
679 * In many cases when we need to set socket properties or otherwise access the
680 * underlying transport from a wrapped transport. This method allows access to
681 * the derived classes of the underlying transport.
684 const T* getUnderlyingTransport() const {
685 const AsyncTransportWrapper* current = this;
687 auto sock = dynamic_cast<const T*>(current);
691 current = current->getWrappedTransport();
697 T* getUnderlyingTransport() {
698 return const_cast<T*>(static_cast<const AsyncTransportWrapper*>(this)
699 ->getUnderlyingTransport<T>());