#include <sys/types.h>
#include <chrono>
-#include <memory>
#include <map>
+#include <memory>
namespace folly {
#define SO_NO_TRANSPARENT_TLS 200
#endif
+#if defined __linux__ && !defined SO_NO_TSOCKS
+#define SO_NO_TSOCKS 201
+#endif
+
#ifdef _MSC_VER
// We do a dynamic_cast on this, in
// AsyncTransportWrapper::getUnderlyingTransport so be safe and
*
* @param flags Write flags requested for the given write operation
*/
- int getFlags(folly::WriteFlags flags) noexcept {
- return getFlagsImpl(flags, getDefaultFlags(flags));
+ int getFlags(folly::WriteFlags flags, bool zeroCopyEnabled) noexcept {
+ return getFlagsImpl(flags, getDefaultFlags(flags, zeroCopyEnabled));
}
/**
*
* @param flags Write flags requested for the given write operation
*/
- int getDefaultFlags(folly::WriteFlags flags) noexcept;
+ int getDefaultFlags(folly::WriteFlags flags, bool zeroCopyEnabled) noexcept;
};
explicit AsyncSocket();
*/
explicit AsyncSocket(EventBase* evb);
- void setShutdownSocketSet(ShutdownSocketSet* ss);
+ void setShutdownSocketSet(const std::weak_ptr<ShutdownSocketSet>& wSS);
/**
* Create a new AsyncSocket and begin the connection process.
void setReadCB(ReadCallback* callback) override;
ReadCallback* getReadCallback() const override;
+ static const size_t kDefaultZeroCopyThreshold = 0;
+
+ bool setZeroCopy(bool enable);
+ bool getZeroCopy() const {
+ return zeroCopyEnabled_;
+ }
+
+ void setZeroCopyWriteChainThreshold(size_t threshold);
+ size_t getZeroCopyWriteChainThreshold() const {
+ return zeroCopyWriteChainThreshold_;
+ }
+
+ bool isZeroCopyMsg(const cmsghdr& cmsg) const;
+ void processZeroCopyMsg(const cmsghdr& cmsg);
+
void write(WriteCallback* callback, const void* buf, size_t bytes,
WriteFlags flags = WriteFlags::NONE) override;
void writev(WriteCallback* callback, const iovec* vec, size_t count,
noTransparentTls_ = true;
}
+ void disableTSocks() {
+ noTSocks_ = true;
+ }
+
enum class StateEnum : uint8_t {
UNINIT,
CONNECTING,
evbChangeCb_ = std::move(cb);
}
+ /**
+ * Attempt to cache the current local and peer addresses (if not already
+ * cached) so that they are available from getPeerAddress() and
+ * getLocalAddress() even after the socket is closed.
+ */
+ void cacheAddresses();
+
+ /**
+ * Returns true if there is any zero copy write in progress
+ * Needs to be called from within the socket's EVB thread
+ */
+ bool isZeroCopyWriteInProgress() const noexcept;
+
/**
* writeReturn is the total number of bytes written, or WRITE_ERROR on error.
* If no data has been written, 0 is returned.
std::string withAddr(const std::string& s);
+ void cacheLocalAddress() const;
+ void cachePeerAddress() const;
+
+ bool isZeroCopyRequest(WriteFlags flags);
+ uint32_t getNextZeroCopyBuffId() {
+ return zeroCopyBuffId_++;
+ }
+ void adjustZeroCopyFlags(folly::IOBuf* buf, folly::WriteFlags& flags);
+ void adjustZeroCopyFlags(
+ const iovec* vec,
+ uint32_t count,
+ folly::WriteFlags& flags);
+ void addZeroCopyBuf(std::unique_ptr<folly::IOBuf>&& buf);
+ void addZeroCopyBuf(folly::IOBuf* ptr);
+ void setZeroCopyBuf(std::unique_ptr<folly::IOBuf>&& buf);
+ bool containsZeroCopyBuf(folly::IOBuf* ptr);
+ void releaseZeroCopyBuf(uint32_t id);
+
+ // a folly::IOBuf can be used in multiple partial requests
+ // there is a that maps a buffer id to a raw folly::IOBuf ptr
+ // and another one that adds a ref count for a folly::IOBuf that is either
+ // the original ptr or nullptr
+ uint32_t zeroCopyBuffId_{0};
+
+ struct IOBufInfo {
+ uint32_t count_{0};
+ std::unique_ptr<folly::IOBuf> buf_;
+ };
+
+ std::unordered_map<uint32_t, folly::IOBuf*> idZeroCopyBufPtrMap_;
+ std::unordered_map<folly::IOBuf*, IOBufInfo> idZeroCopyBufInfoMap_;
+
StateEnum state_; ///< StateEnum describing current state
uint8_t shutdownFlags_; ///< Shutdown state (ShutdownFlags)
uint16_t eventFlags_; ///< EventBase::HandlerFlags settings
///< The address we are connecting from
uint32_t sendTimeout_; ///< The send timeout, in milliseconds
uint16_t maxReadsPerEvent_; ///< Max reads per event loop iteration
+
+ bool isBufferMovable_{false};
+
+ int8_t readErr_{READ_NO_ERROR}; ///< The read error encountered, if any
+
EventBase* eventBase_; ///< The EventBase
WriteTimeout writeTimeout_; ///< A timeout for connect and write
IoHandler ioHandler_; ///< A EventHandler to monitor the fd
ConnectCallback* connectCallback_; ///< ConnectCallback
ErrMessageCallback* errMessageCallback_; ///< TimestampCallback
- SendMsgParamsCallback* ///< Callback for retreaving
- sendMsgParamCallback_; ///< ::sendmsg() parameters
+ SendMsgParamsCallback* ///< Callback for retrieving
+ sendMsgParamCallback_; ///< ::sendmsg() parameters
ReadCallback* readCallback_; ///< ReadCallback
WriteRequest* writeReqHead_; ///< Chain of WriteRequests
WriteRequest* writeReqTail_; ///< End of WriteRequest chain
- ShutdownSocketSet* shutdownSocketSet_;
+ std::weak_ptr<ShutdownSocketSet> wShutdownSocketSet_;
size_t appBytesReceived_; ///< Num of bytes received from socket
size_t appBytesWritten_; ///< Num of bytes written to socket
- bool isBufferMovable_{false};
// Pre-received data, to be returned to read callback before any data from the
// socket.
std::unique_ptr<IOBuf> preReceivedData_;
- int8_t readErr_{READ_NO_ERROR}; ///< The read error encountered, if any
-
std::chrono::steady_clock::time_point connectStartTime_;
std::chrono::steady_clock::time_point connectEndTime_;
std::chrono::milliseconds connectTimeout_{0};
+ std::unique_ptr<EvbChangeCallback> evbChangeCb_{nullptr};
+
BufferCallback* bufferCallback_{nullptr};
bool tfoEnabled_{false};
bool tfoAttempted_{false};
bool tfoFinished_{false};
bool noTransparentTls_{false};
+ bool noTSocks_{false};
// Whether to track EOR or not.
bool trackEor_{false};
-
- std::unique_ptr<EvbChangeCallback> evbChangeCb_{nullptr};
+ bool zeroCopyEnabled_{false};
+ bool zeroCopyVal_{false};
+ size_t zeroCopyWriteChainThreshold_{kDefaultZeroCopyThreshold};
};
#ifdef _MSC_VER
#pragma vtordisp(pop)
#endif
-} // folly
+} // namespace folly