Change kDefaultZeroCopyThreshold to 0 to avoid a regression and avoid a failure while...
[folly.git] / folly / io / async / AsyncSocket.h
index 1bf1b4debd9a075f3752321781b06ef2136033c2..fb9d2c7f5983f64245681aca39a07424b6a48475 100644 (file)
@@ -31,8 +31,8 @@
 #include <sys/types.h>
 
 #include <chrono>
-#include <memory>
 #include <map>
+#include <memory>
 
 namespace folly {
 
@@ -68,6 +68,10 @@ namespace folly {
 #define SO_NO_TRANSPARENT_TLS 200
 #endif
 
+#if defined __linux__ && !defined SO_NO_TSOCKS
+#define SO_NO_TSOCKS 201
+#endif
+
 #ifdef _MSC_VER
 // We do a dynamic_cast on this, in
 // AsyncTransportWrapper::getUnderlyingTransport so be safe and
@@ -152,8 +156,8 @@ class AsyncSocket : virtual public AsyncTransportWrapper {
      *
      * @param flags     Write flags requested for the given write operation
      */
-    int getFlags(folly::WriteFlags flags) noexcept {
-      return getFlagsImpl(flags, getDefaultFlags(flags));
+    int getFlags(folly::WriteFlags flags, bool zeroCopyEnabled) noexcept {
+      return getFlagsImpl(flags, getDefaultFlags(flags, zeroCopyEnabled));
     }
 
     /**
@@ -207,7 +211,7 @@ class AsyncSocket : virtual public AsyncTransportWrapper {
      *
      * @param flags     Write flags requested for the given write operation
      */
-    int getDefaultFlags(folly::WriteFlags flags) noexcept;
+    int getDefaultFlags(folly::WriteFlags flags, bool zeroCopyEnabled) noexcept;
   };
 
   explicit AsyncSocket();
@@ -218,7 +222,7 @@ class AsyncSocket : virtual public AsyncTransportWrapper {
    */
   explicit AsyncSocket(EventBase* evb);
 
-  void setShutdownSocketSet(ShutdownSocketSet* ss);
+  void setShutdownSocketSet(const std::weak_ptr<ShutdownSocketSet>& wSS);
 
   /**
    * Create a new AsyncSocket and begin the connection process.
@@ -500,6 +504,21 @@ class AsyncSocket : virtual public AsyncTransportWrapper {
   void setReadCB(ReadCallback* callback) override;
   ReadCallback* getReadCallback() const override;
 
+  static const size_t kDefaultZeroCopyThreshold = 0;
+
+  bool setZeroCopy(bool enable);
+  bool getZeroCopy() const {
+    return zeroCopyEnabled_;
+  }
+
+  void setZeroCopyWriteChainThreshold(size_t threshold);
+  size_t getZeroCopyWriteChainThreshold() const {
+    return zeroCopyWriteChainThreshold_;
+  }
+
+  bool isZeroCopyMsg(const cmsghdr& cmsg) const;
+  void processZeroCopyMsg(const cmsghdr& cmsg);
+
   void write(WriteCallback* callback, const void* buf, size_t bytes,
              WriteFlags flags = WriteFlags::NONE) override;
   void writev(WriteCallback* callback, const iovec* vec, size_t count,
@@ -761,6 +780,10 @@ class AsyncSocket : virtual public AsyncTransportWrapper {
     noTransparentTls_ = true;
   }
 
+  void disableTSocks() {
+    noTSocks_ = true;
+  }
+
   enum class StateEnum : uint8_t {
     UNINIT,
     CONNECTING,
@@ -778,6 +801,19 @@ class AsyncSocket : virtual public AsyncTransportWrapper {
     evbChangeCb_ = std::move(cb);
   }
 
+  /**
+   * Attempt to cache the current local and peer addresses (if not already
+   * cached) so that they are available from getPeerAddress() and
+   * getLocalAddress() even after the socket is closed.
+   */
+  void cacheAddresses();
+
+  /**
+   * Returns true if there is any zero copy write in progress
+   * Needs to be called from within the socket's EVB thread
+   */
+  bool isZeroCopyWriteInProgress() const noexcept;
+
   /**
    * writeReturn is the total number of bytes written, or WRITE_ERROR on error.
    * If no data has been written, 0 is returned.
@@ -1115,6 +1151,38 @@ class AsyncSocket : virtual public AsyncTransportWrapper {
 
   std::string withAddr(const std::string& s);
 
+  void cacheLocalAddress() const;
+  void cachePeerAddress() const;
+
+  bool isZeroCopyRequest(WriteFlags flags);
+  uint32_t getNextZeroCopyBuffId() {
+    return zeroCopyBuffId_++;
+  }
+  void adjustZeroCopyFlags(folly::IOBuf* buf, folly::WriteFlags& flags);
+  void adjustZeroCopyFlags(
+      const iovec* vec,
+      uint32_t count,
+      folly::WriteFlags& flags);
+  void addZeroCopyBuf(std::unique_ptr<folly::IOBuf>&& buf);
+  void addZeroCopyBuf(folly::IOBuf* ptr);
+  void setZeroCopyBuf(std::unique_ptr<folly::IOBuf>&& buf);
+  bool containsZeroCopyBuf(folly::IOBuf* ptr);
+  void releaseZeroCopyBuf(uint32_t id);
+
+  // a folly::IOBuf can be used in multiple partial requests
+  // there is a that maps a buffer id to a raw folly::IOBuf ptr
+  // and another one that adds a ref count for a folly::IOBuf that is either
+  // the original ptr or nullptr
+  uint32_t zeroCopyBuffId_{0};
+
+  struct IOBufInfo {
+    uint32_t count_{0};
+    std::unique_ptr<folly::IOBuf> buf_;
+  };
+
+  std::unordered_map<uint32_t, folly::IOBuf*> idZeroCopyBufPtrMap_;
+  std::unordered_map<folly::IOBuf*, IOBufInfo> idZeroCopyBufInfoMap_;
+
   StateEnum state_;                      ///< StateEnum describing current state
   uint8_t shutdownFlags_;                ///< Shutdown state (ShutdownFlags)
   uint16_t eventFlags_;                  ///< EventBase::HandlerFlags settings
@@ -1124,6 +1192,11 @@ class AsyncSocket : virtual public AsyncTransportWrapper {
                                          ///< The address we are connecting from
   uint32_t sendTimeout_;                 ///< The send timeout, in milliseconds
   uint16_t maxReadsPerEvent_;            ///< Max reads per event loop iteration
+
+  bool isBufferMovable_{false};
+
+  int8_t readErr_{READ_NO_ERROR}; ///< The read error encountered, if any
+
   EventBase* eventBase_;                 ///< The EventBase
   WriteTimeout writeTimeout_;            ///< A timeout for connect and write
   IoHandler ioHandler_;                  ///< A EventHandler to monitor the fd
@@ -1131,39 +1204,40 @@ class AsyncSocket : virtual public AsyncTransportWrapper {
 
   ConnectCallback* connectCallback_;     ///< ConnectCallback
   ErrMessageCallback* errMessageCallback_; ///< TimestampCallback
-  SendMsgParamsCallback*                 ///< Callback for retreaving
-      sendMsgParamCallback_;             ///< ::sendmsg() parameters
+  SendMsgParamsCallback* ///< Callback for retrieving
+      sendMsgParamCallback_; ///< ::sendmsg() parameters
   ReadCallback* readCallback_;           ///< ReadCallback
   WriteRequest* writeReqHead_;           ///< Chain of WriteRequests
   WriteRequest* writeReqTail_;           ///< End of WriteRequest chain
-  ShutdownSocketSet* shutdownSocketSet_;
+  std::weak_ptr<ShutdownSocketSet> wShutdownSocketSet_;
   size_t appBytesReceived_;              ///< Num of bytes received from socket
   size_t appBytesWritten_;               ///< Num of bytes written to socket
-  bool isBufferMovable_{false};
 
   // Pre-received data, to be returned to read callback before any data from the
   // socket.
   std::unique_ptr<IOBuf> preReceivedData_;
 
-  int8_t readErr_{READ_NO_ERROR};        ///< The read error encountered, if any
-
   std::chrono::steady_clock::time_point connectStartTime_;
   std::chrono::steady_clock::time_point connectEndTime_;
 
   std::chrono::milliseconds connectTimeout_{0};
 
+  std::unique_ptr<EvbChangeCallback> evbChangeCb_{nullptr};
+
   BufferCallback* bufferCallback_{nullptr};
   bool tfoEnabled_{false};
   bool tfoAttempted_{false};
   bool tfoFinished_{false};
   bool noTransparentTls_{false};
+  bool noTSocks_{false};
   // Whether to track EOR or not.
   bool trackEor_{false};
-
-  std::unique_ptr<EvbChangeCallback> evbChangeCb_{nullptr};
+  bool zeroCopyEnabled_{false};
+  bool zeroCopyVal_{false};
+  size_t zeroCopyWriteChainThreshold_{kDefaultZeroCopyThreshold};
 };
 #ifdef _MSC_VER
 #pragma vtordisp(pop)
 #endif
 
-} // folly
+} // namespace folly