#include <folly/io/async/EventHandler.h>
#include <folly/io/async/DelayedDestruction.h>
+#include <chrono>
#include <memory>
#include <map>
class ConnectCallback {
public:
- virtual ~ConnectCallback() {}
+ virtual ~ConnectCallback() = default;
/**
* connectSuccess() will be invoked when the connection has been
std::unique_ptr<folly::IOBuf>&& buf,
WriteFlags flags = WriteFlags::NONE) override;
+ class WriteRequest;
+ virtual void writeRequest(WriteRequest* req);
+ void writeRequestReady() {
+ handleWrite();
+ }
+
// Methods inherited from AsyncTransport
void close() override;
void closeNow() override;
bool isEorTrackingEnabled() const override { return false; }
- void setEorTracking(bool track) override {}
+ void setEorTracking(bool /*track*/) override {}
bool connecting() const override {
return (state_ == StateEnum::CONNECTING);
}
+ virtual bool isClosedByPeer() const {
+ return (state_ == StateEnum::CLOSED &&
+ (readErr_ == READ_EOF || readErr_ == READ_ERROR));
+ }
+
+ virtual bool isClosedBySelf() const {
+ return (state_ == StateEnum::CLOSED &&
+ (readErr_ != READ_EOF && readErr_ != READ_ERROR));
+ }
+
size_t getAppBytesWritten() const override {
return appBytesWritten_;
}
return getAppBytesReceived();
}
+ std::chrono::nanoseconds getConnectTime() const {
+ return connectEndTime_ - connectStartTime_;
+ }
+
// Methods controlling socket options
/**
#define SO_SET_NAMESPACE 41
int setTCPProfile(int profd);
+ /**
+ * Set TCP_CORK on the socket, and turn on/off the persistentCork_ flag
+ *
+ * When persistentCork_ is true, CorkGuard in AsyncSSLSocket will not be
+ * able to toggle TCP_CORK
+ *
+ */
+ void setPersistentCork(bool cork);
/**
* Generic API for reading a socket option.
return setsockopt(fd_, level, optname, optval, sizeof(T));
}
+ virtual void setPeek(bool peek) {
+ peek_ = peek;
+ }
+
enum class StateEnum : uint8_t {
UNINIT,
CONNECTING,
ERROR
};
+ /**
+ * A WriteRequest object tracks information about a pending write operation.
+ */
+ class WriteRequest {
+ public:
+ WriteRequest(AsyncSocket* socket, WriteCallback* callback) :
+ socket_(socket), callback_(callback) {}
+
+ virtual void start() {};
+
+ virtual void destroy() = 0;
+
+ virtual bool performWrite() = 0;
+
+ virtual void consume() = 0;
+
+ virtual bool isComplete() = 0;
+
+ WriteRequest* getNext() const {
+ return next_;
+ }
+
+ WriteCallback* getCallback() const {
+ return callback_;
+ }
+
+ uint32_t getTotalBytesWritten() const {
+ return totalBytesWritten_;
+ }
+
+ void append(WriteRequest* next) {
+ assert(next_ == nullptr);
+ next_ = next;
+ }
+
+ void fail(const char* fn, const AsyncSocketException& ex) {
+ socket_->failWrite(fn, ex);
+ }
+
+ void bytesWritten(size_t count) {
+ totalBytesWritten_ += count;
+ socket_->appBytesWritten_ += count;
+ }
+
+ protected:
+ // protected destructor, to ensure callers use destroy()
+ virtual ~WriteRequest() {}
+
+ AsyncSocket* socket_; ///< parent socket
+ WriteRequest* next_{nullptr}; ///< pointer to next WriteRequest
+ WriteCallback* callback_; ///< completion callback
+ uint32_t totalBytesWritten_{0}; ///< total bytes written
+ };
+
protected:
enum ReadResultEnum {
READ_EOF = 0,
READ_ERROR = -1,
READ_BLOCKING = -2,
+ READ_NO_ERROR = -3,
};
/**
SHUT_READ = 0x04,
};
- class WriteRequest;
class BytesWriteRequest;
class WriteTimeout : public AsyncTimeout {
public:
explicit ImmediateReadCB(AsyncSocket* socket) : socket_(socket) {}
void runLoopCallback() noexcept override {
+ DestructorGuard dg(socket_);
socket_->checkForImmediateRead();
}
private:
void ioReady(uint16_t events) noexcept;
virtual void checkForImmediateRead() noexcept;
virtual void handleInitialReadWrite() noexcept;
+ virtual void prepareReadBuffer(void** buf, size_t* buflen) noexcept;
virtual void handleRead() noexcept;
virtual void handleWrite() noexcept;
virtual void handleConnect() noexcept;
* READ_ERROR on error, or READ_BLOCKING if the operation will
* block.
*/
- virtual ssize_t performRead(void* buf, size_t buflen);
+ virtual ssize_t performRead(void** buf, size_t* buflen, size_t* offset);
/**
* Populate an iovec array from an IOBuf and attempt to write it.
const AsyncSocketException& ex);
void failWrite(const char* fn, const AsyncSocketException& ex);
void failAllWrites(const AsyncSocketException& ex);
+ void invokeConnectErr(const AsyncSocketException& ex);
+ void invokeConnectSuccess();
void invalidState(ConnectCallback* callback);
void invalidState(ReadCallback* callback);
void invalidState(WriteCallback* callback);
std::string withAddr(const std::string& s);
+ /**
+ * Set TCP_CORK on this socket
+ *
+ * @return 0 if Cork is turned on, or non-zero errno on error
+ */
+ int setCork(bool cork);
+
StateEnum state_; ///< StateEnum describing current state
uint8_t shutdownFlags_; ///< Shutdown state (ShutdownFlags)
uint16_t eventFlags_; ///< EventBase::HandlerFlags settings
ShutdownSocketSet* shutdownSocketSet_;
size_t appBytesReceived_; ///< Num of bytes received from socket
size_t appBytesWritten_; ///< Num of bytes written to socket
+ bool isBufferMovable_{false};
+
+ bool peek_{false}; // Peek bytes.
+
+ int8_t readErr_{READ_NO_ERROR}; ///< The read error encountered, if any.
+
+ std::chrono::steady_clock::time_point connectStartTime_;
+ std::chrono::steady_clock::time_point connectEndTime_;
+
+ // Whether this connection is persistently corked
+ bool persistentCork_{false};
+ // Whether we've applied the TCP_CORK option to the socket
+ bool corked_{false};
};