APIs to determine which end of the socket has closed it
[folly.git] / folly / io / async / AsyncTransport.h
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <memory>
20 #include <sys/uio.h>
21
22 #include <folly/io/async/DelayedDestruction.h>
23 #include <folly/io/async/EventBase.h>
24 #include <folly/io/async/AsyncSocketBase.h>
25
26 #include <openssl/ssl.h>
27
28 constexpr bool kOpenSslModeMoveBufferOwnership =
29 #ifdef SSL_MODE_MOVE_BUFFER_OWNERSHIP
30   true
31 #else
32   false
33 #endif
34 ;
35
36 namespace folly {
37
38 class AsyncSocketException;
39 class EventBase;
40 class IOBuf;
41 class SocketAddress;
42
43 /*
44  * flags given by the application for write* calls
45  */
46 enum class WriteFlags : uint32_t {
47   NONE = 0x00,
48   /*
49    * Whether to delay the output until a subsequent non-corked write.
50    * (Note: may not be supported in all subclasses or on all platforms.)
51    */
52   CORK = 0x01,
53   /*
54    * for a socket that has ACK latency enabled, it will cause the kernel
55    * to fire a TCP ESTATS event when the last byte of the given write call
56    * will be acknowledged.
57    */
58   EOR = 0x02,
59 };
60
61 /*
62  * union operator
63  */
64 inline WriteFlags operator|(WriteFlags a, WriteFlags b) {
65   return static_cast<WriteFlags>(
66     static_cast<uint32_t>(a) | static_cast<uint32_t>(b));
67 }
68
69 /*
70  * intersection operator
71  */
72 inline WriteFlags operator&(WriteFlags a, WriteFlags b) {
73   return static_cast<WriteFlags>(
74     static_cast<uint32_t>(a) & static_cast<uint32_t>(b));
75 }
76
77 /*
78  * exclusion parameter
79  */
80 inline WriteFlags operator~(WriteFlags a) {
81   return static_cast<WriteFlags>(~static_cast<uint32_t>(a));
82 }
83
84 /*
85  * unset operator
86  */
87 inline WriteFlags unSet(WriteFlags a, WriteFlags b) {
88   return a & ~b;
89 }
90
91 /*
92  * inclusion operator
93  */
94 inline bool isSet(WriteFlags a, WriteFlags b) {
95   return (a & b) == b;
96 }
97
98
99 /**
100  * AsyncTransport defines an asynchronous API for streaming I/O.
101  *
102  * This class provides an API to for asynchronously waiting for data
103  * on a streaming transport, and for asynchronously sending data.
104  *
105  * The APIs for reading and writing are intentionally asymmetric.  Waiting for
106  * data to read is a persistent API: a callback is installed, and is notified
107  * whenever new data is available.  It continues to be notified of new events
108  * until it is uninstalled.
109  *
110  * AsyncTransport does not provide read timeout functionality, because it
111  * typically cannot determine when the timeout should be active.  Generally, a
112  * timeout should only be enabled when processing is blocked waiting on data
113  * from the remote endpoint.  For server-side applications, the timeout should
114  * not be active if the server is currently processing one or more outstanding
115  * requests on this transport.  For client-side applications, the timeout
116  * should not be active if there are no requests pending on the transport.
117  * Additionally, if a client has multiple pending requests, it will ususally
118  * want a separate timeout for each request, rather than a single read timeout.
119  *
120  * The write API is fairly intuitive: a user can request to send a block of
121  * data, and a callback will be informed once the entire block has been
122  * transferred to the kernel, or on error.  AsyncTransport does provide a send
123  * timeout, since most callers want to give up if the remote end stops
124  * responding and no further progress can be made sending the data.
125  */
126 class AsyncTransport : public DelayedDestruction, public AsyncSocketBase {
127  public:
128   typedef std::unique_ptr<AsyncTransport, Destructor> UniquePtr;
129
130   /**
131    * Close the transport.
132    *
133    * This gracefully closes the transport, waiting for all pending write
134    * requests to complete before actually closing the underlying transport.
135    *
136    * If a read callback is set, readEOF() will be called immediately.  If there
137    * are outstanding write requests, the close will be delayed until all
138    * remaining writes have completed.  No new writes may be started after
139    * close() has been called.
140    */
141   virtual void close() = 0;
142
143   /**
144    * Close the transport immediately.
145    *
146    * This closes the transport immediately, dropping any outstanding data
147    * waiting to be written.
148    *
149    * If a read callback is set, readEOF() will be called immediately.
150    * If there are outstanding write requests, these requests will be aborted
151    * and writeError() will be invoked immediately on all outstanding write
152    * callbacks.
153    */
154   virtual void closeNow() = 0;
155
156   /**
157    * Reset the transport immediately.
158    *
159    * This closes the transport immediately, sending a reset to the remote peer
160    * if possible to indicate abnormal shutdown.
161    *
162    * Note that not all subclasses implement this reset functionality: some
163    * subclasses may treat reset() the same as closeNow().  Subclasses that use
164    * TCP transports should terminate the connection with a TCP reset.
165    */
166   virtual void closeWithReset() {
167     closeNow();
168   }
169
170   /**
171    * Perform a half-shutdown of the write side of the transport.
172    *
173    * The caller should not make any more calls to write() or writev() after
174    * shutdownWrite() is called.  Any future write attempts will fail
175    * immediately.
176    *
177    * Not all transport types support half-shutdown.  If the underlying
178    * transport does not support half-shutdown, it will fully shutdown both the
179    * read and write sides of the transport.  (Fully shutting down the socket is
180    * better than doing nothing at all, since the caller may rely on the
181    * shutdownWrite() call to notify the other end of the connection that no
182    * more data can be read.)
183    *
184    * If there is pending data still waiting to be written on the transport,
185    * the actual shutdown will be delayed until the pending data has been
186    * written.
187    *
188    * Note: There is no corresponding shutdownRead() equivalent.  Simply
189    * uninstall the read callback if you wish to stop reading.  (On TCP sockets
190    * at least, shutting down the read side of the socket is a no-op anyway.)
191    */
192   virtual void shutdownWrite() = 0;
193
194   /**
195    * Perform a half-shutdown of the write side of the transport.
196    *
197    * shutdownWriteNow() is identical to shutdownWrite(), except that it
198    * immediately performs the shutdown, rather than waiting for pending writes
199    * to complete.  Any pending write requests will be immediately failed when
200    * shutdownWriteNow() is called.
201    */
202   virtual void shutdownWriteNow() = 0;
203
204   /**
205    * Determine if transport is open and ready to read or write.
206    *
207    * Note that this function returns false on EOF; you must also call error()
208    * to distinguish between an EOF and an error.
209    *
210    * @return  true iff the transport is open and ready, false otherwise.
211    */
212   virtual bool good() const = 0;
213
214   /**
215    * Determine if the transport is readable or not.
216    *
217    * @return  true iff the transport is readable, false otherwise.
218    */
219   virtual bool readable() const = 0;
220
221   /**
222    * Determine if the there is pending data on the transport.
223    *
224    * @return  true iff the if the there is pending data, false otherwise.
225    */
226   virtual bool isPending() const {
227     return readable();
228   }
229
230   /**
231    * Determine if transport is connected to the endpoint
232    *
233    * @return  false iff the transport is connected, otherwise true
234    */
235   virtual bool connecting() const = 0;
236
237   /**
238    * Determine if an error has occurred with this transport.
239    *
240    * @return  true iff an error has occurred (not EOF).
241    */
242   virtual bool error() const = 0;
243
244   /**
245    * Attach the transport to a EventBase.
246    *
247    * This may only be called if the transport is not currently attached to a
248    * EventBase (by an earlier call to detachEventBase()).
249    *
250    * This method must be invoked in the EventBase's thread.
251    */
252   virtual void attachEventBase(EventBase* eventBase) = 0;
253
254   /**
255    * Detach the transport from its EventBase.
256    *
257    * This may only be called when the transport is idle and has no reads or
258    * writes pending.  Once detached, the transport may not be used again until
259    * it is re-attached to a EventBase by calling attachEventBase().
260    *
261    * This method must be called from the current EventBase's thread.
262    */
263   virtual void detachEventBase() = 0;
264
265   /**
266    * Determine if the transport can be detached.
267    *
268    * This method must be called from the current EventBase's thread.
269    */
270   virtual bool isDetachable() const = 0;
271
272   /**
273    * Set the send timeout.
274    *
275    * If write requests do not make any progress for more than the specified
276    * number of milliseconds, fail all pending writes and close the transport.
277    *
278    * If write requests are currently pending when setSendTimeout() is called,
279    * the timeout interval is immediately restarted using the new value.
280    *
281    * @param milliseconds  The timeout duration, in milliseconds.  If 0, no
282    *                      timeout will be used.
283    */
284   virtual void setSendTimeout(uint32_t milliseconds) = 0;
285
286   /**
287    * Get the send timeout.
288    *
289    * @return Returns the current send timeout, in milliseconds.  A return value
290    *         of 0 indicates that no timeout is set.
291    */
292   virtual uint32_t getSendTimeout() const = 0;
293
294   /**
295    * Get the address of the local endpoint of this transport.
296    *
297    * This function may throw AsyncSocketException on error.
298    *
299    * @param address  The local address will be stored in the specified
300    *                 SocketAddress.
301    */
302   virtual void getLocalAddress(SocketAddress* address) const = 0;
303
304   virtual void getAddress(SocketAddress* address) const {
305     getLocalAddress(address);
306   }
307
308   /**
309    * Get the address of the remote endpoint to which this transport is
310    * connected.
311    *
312    * This function may throw AsyncSocketException on error.
313    *
314    * @param address  The remote endpoint's address will be stored in the
315    *                 specified SocketAddress.
316    */
317   virtual void getPeerAddress(SocketAddress* address) const = 0;
318
319   /**
320    * @return True iff end of record tracking is enabled
321    */
322   virtual bool isEorTrackingEnabled() const = 0;
323
324   virtual void setEorTracking(bool track) = 0;
325
326   virtual size_t getAppBytesWritten() const = 0;
327   virtual size_t getRawBytesWritten() const = 0;
328   virtual size_t getAppBytesReceived() const = 0;
329   virtual size_t getRawBytesReceived() const = 0;
330
331  protected:
332   virtual ~AsyncTransport() = default;
333 };
334
335 class AsyncReader {
336  public:
337   class ReadCallback {
338    public:
339     virtual ~ReadCallback() = default;
340
341     /**
342      * When data becomes available, getReadBuffer() will be invoked to get the
343      * buffer into which data should be read.
344      *
345      * This method allows the ReadCallback to delay buffer allocation until
346      * data becomes available.  This allows applications to manage large
347      * numbers of idle connections, without having to maintain a separate read
348      * buffer for each idle connection.
349      *
350      * It is possible that in some cases, getReadBuffer() may be called
351      * multiple times before readDataAvailable() is invoked.  In this case, the
352      * data will be written to the buffer returned from the most recent call to
353      * readDataAvailable().  If the previous calls to readDataAvailable()
354      * returned different buffers, the ReadCallback is responsible for ensuring
355      * that they are not leaked.
356      *
357      * If getReadBuffer() throws an exception, returns a nullptr buffer, or
358      * returns a 0 length, the ReadCallback will be uninstalled and its
359      * readError() method will be invoked.
360      *
361      * getReadBuffer() is not allowed to change the transport state before it
362      * returns.  (For example, it should never uninstall the read callback, or
363      * set a different read callback.)
364      *
365      * @param bufReturn getReadBuffer() should update *bufReturn to contain the
366      *                  address of the read buffer.  This parameter will never
367      *                  be nullptr.
368      * @param lenReturn getReadBuffer() should update *lenReturn to contain the
369      *                  maximum number of bytes that may be written to the read
370      *                  buffer.  This parameter will never be nullptr.
371      */
372     virtual void getReadBuffer(void** bufReturn, size_t* lenReturn) = 0;
373
374     /**
375      * readDataAvailable() will be invoked when data has been successfully read
376      * into the buffer returned by the last call to getReadBuffer().
377      *
378      * The read callback remains installed after readDataAvailable() returns.
379      * It must be explicitly uninstalled to stop receiving read events.
380      * getReadBuffer() will be called at least once before each call to
381      * readDataAvailable().  getReadBuffer() will also be called before any
382      * call to readEOF().
383      *
384      * @param len       The number of bytes placed in the buffer.
385      */
386
387     virtual void readDataAvailable(size_t len) noexcept = 0;
388
389     /**
390      * When data becomes available, isBufferMovable() will be invoked to figure
391      * out which API will be used, readBufferAvailable() or
392      * readDataAvailable(). If isBufferMovable() returns true, that means
393      * ReadCallback supports the IOBuf ownership transfer and
394      * readBufferAvailable() will be used.  Otherwise, not.
395
396      * By default, isBufferMovable() always return false. If
397      * readBufferAvailable() is implemented and to be invoked, You should
398      * overwrite isBufferMovable() and return true in the inherited class.
399      *
400      * This method allows the AsyncSocket/AsyncSSLSocket do buffer allocation by
401      * itself until data becomes available.  Compared with the pre/post buffer
402      * allocation in getReadBuffer()/readDataAvailabe(), readBufferAvailable()
403      * has two advantages.  First, this can avoid memcpy. E.g., in
404      * AsyncSSLSocket, the decrypted data was copied from the openssl internal
405      * buffer to the readbuf buffer.  With the buffer ownership transfer, the
406      * internal buffer can be directly "moved" to ReadCallback. Second, the
407      * memory allocation can be more precise.  The reason is
408      * AsyncSocket/AsyncSSLSocket can allocate the memory of precise size
409      * because they have more context about the available data than
410      * ReadCallback.  Think about the getReadBuffer() pre-allocate 4072 bytes
411      * buffer, but the available data is always 16KB (max OpenSSL record size).
412      */
413
414     virtual bool isBufferMovable() noexcept {
415       return false;
416     }
417
418     /**
419      * readBufferAvailable() will be invoked when data has been successfully
420      * read.
421      *
422      * Note that only either readBufferAvailable() or readDataAvailable() will
423      * be invoked according to the return value of isBufferMovable(). The timing
424      * and aftereffect of readBufferAvailable() are the same as
425      * readDataAvailable()
426      *
427      * @param readBuf The unique pointer of read buffer.
428      */
429
430     virtual void readBufferAvailable(std::unique_ptr<IOBuf> /*readBuf*/)
431       noexcept {};
432
433     /**
434      * readEOF() will be invoked when the transport is closed.
435      *
436      * The read callback will be automatically uninstalled immediately before
437      * readEOF() is invoked.
438      */
439     virtual void readEOF() noexcept = 0;
440
441     /**
442      * readError() will be invoked if an error occurs reading from the
443      * transport.
444      *
445      * The read callback will be automatically uninstalled immediately before
446      * readError() is invoked.
447      *
448      * @param ex        An exception describing the error that occurred.
449      */
450     virtual void readErr(const AsyncSocketException& ex) noexcept = 0;
451   };
452
453   // Read methods that aren't part of AsyncTransport.
454   virtual void setReadCB(ReadCallback* callback) = 0;
455   virtual ReadCallback* getReadCallback() const = 0;
456
457  protected:
458   virtual ~AsyncReader() = default;
459 };
460
461 class AsyncWriter {
462  public:
463   class WriteCallback {
464    public:
465     virtual ~WriteCallback() = default;
466
467     /**
468      * writeSuccess() will be invoked when all of the data has been
469      * successfully written.
470      *
471      * Note that this mainly signals that the buffer containing the data to
472      * write is no longer needed and may be freed or re-used.  It does not
473      * guarantee that the data has been fully transmitted to the remote
474      * endpoint.  For example, on socket-based transports, writeSuccess() only
475      * indicates that the data has been given to the kernel for eventual
476      * transmission.
477      */
478     virtual void writeSuccess() noexcept = 0;
479
480     /**
481      * writeError() will be invoked if an error occurs writing the data.
482      *
483      * @param bytesWritten      The number of bytes that were successfull
484      * @param ex                An exception describing the error that occurred.
485      */
486     virtual void writeErr(size_t bytesWritten,
487                           const AsyncSocketException& ex) noexcept = 0;
488   };
489
490   // Write methods that aren't part of AsyncTransport
491   virtual void write(WriteCallback* callback, const void* buf, size_t bytes,
492                      WriteFlags flags = WriteFlags::NONE) = 0;
493   virtual void writev(WriteCallback* callback, const iovec* vec, size_t count,
494                       WriteFlags flags = WriteFlags::NONE) = 0;
495   virtual void writeChain(WriteCallback* callback,
496                           std::unique_ptr<IOBuf>&& buf,
497                           WriteFlags flags = WriteFlags::NONE) = 0;
498
499  protected:
500   virtual ~AsyncWriter() = default;
501 };
502
503 // Transitional intermediate interface. This is deprecated.
504 // Wrapper around folly::AsyncTransport, that includes read/write callbacks
505 class AsyncTransportWrapper : virtual public AsyncTransport,
506                               virtual public AsyncReader,
507                               virtual public AsyncWriter {
508  public:
509   using UniquePtr = std::unique_ptr<AsyncTransportWrapper, Destructor>;
510
511   // Alias for inherited members from AsyncReader and AsyncWriter
512   // to keep compatibility.
513   using ReadCallback    = AsyncReader::ReadCallback;
514   using WriteCallback   = AsyncWriter::WriteCallback;
515   virtual void setReadCB(ReadCallback* callback) override = 0;
516   virtual ReadCallback* getReadCallback() const override = 0;
517   virtual void write(WriteCallback* callback, const void* buf, size_t bytes,
518                      WriteFlags flags = WriteFlags::NONE) override = 0;
519   virtual void writev(WriteCallback* callback, const iovec* vec, size_t count,
520                       WriteFlags flags = WriteFlags::NONE) override = 0;
521   virtual void writeChain(WriteCallback* callback,
522                           std::unique_ptr<IOBuf>&& buf,
523                           WriteFlags flags = WriteFlags::NONE) override = 0;
524 };
525
526 } // folly