From fbc4c23895b0ee3874d9a36401d580a2a8957ba9 Mon Sep 17 00:00:00 2001 From: Dan Melnic Date: Sun, 8 Oct 2017 18:27:54 -0700 Subject: [PATCH] Add SO_ZEROCOPY support Summary: Add SO_ZEROCOPY support Reviewed By: djwatson Differential Revision: D5851637 fbshipit-source-id: 5378b7e44ce9d888ae08527506218998974d4309 --- folly/io/async/AsyncSSLSocket.cpp | 3 +- folly/io/async/AsyncServerSocket.cpp | 19 ++ folly/io/async/AsyncServerSocket.h | 5 + folly/io/async/AsyncSocket.cpp | 220 ++++++++++++- folly/io/async/AsyncSocket.h | 54 ++- folly/io/async/AsyncTransport.h | 4 + folly/io/async/test/AsyncSSLSocketTest.h | 4 +- folly/io/async/test/ZeroCopyBenchmark.cpp | 379 ++++++++++++++++++++++ folly/portability/Sockets.h | 26 +- 9 files changed, 692 insertions(+), 22 deletions(-) create mode 100644 folly/io/async/test/ZeroCopyBenchmark.cpp diff --git a/folly/io/async/AsyncSSLSocket.cpp b/folly/io/async/AsyncSSLSocket.cpp index 2b1aa88e..a30cc7ca 100644 --- a/folly/io/async/AsyncSSLSocket.cpp +++ b/folly/io/async/AsyncSSLSocket.cpp @@ -1646,7 +1646,8 @@ int AsyncSSLSocket::bioWrite(BIO* b, const char* in, int inl) { flags |= WriteFlags::CORK; } - int msg_flags = tsslSock->getSendMsgParamsCB()->getFlags(flags); + int msg_flags = tsslSock->getSendMsgParamsCB()->getFlags( + flags, false /*zeroCopyEnabled*/); msg.msg_controllen = tsslSock->getSendMsgParamsCB()->getAncillaryDataSize(flags); CHECK_GE(AsyncSocket::SendMsgParamsCallback::maxAncillaryDataSize, diff --git a/folly/io/async/AsyncServerSocket.cpp b/folly/io/async/AsyncServerSocket.cpp index 8a70d77c..c9f391f0 100644 --- a/folly/io/async/AsyncServerSocket.cpp +++ b/folly/io/async/AsyncServerSocket.cpp @@ -39,6 +39,13 @@ namespace fsp = folly::portability::sockets; namespace folly { +static constexpr bool msgErrQueueSupported = +#ifdef MSG_ERRQUEUE + true; +#else + false; +#endif // MSG_ERRQUEUE + const uint32_t AsyncServerSocket::kDefaultMaxAcceptAtOnce; const uint32_t AsyncServerSocket::kDefaultCallbackAcceptAtOnce; const uint32_t AsyncServerSocket::kDefaultMaxMessagesInQueue; @@ -331,6 +338,18 @@ void AsyncServerSocket::bindSocket( } } +bool AsyncServerSocket::setZeroCopy(bool enable) { + if (msgErrQueueSupported) { + int fd = getSocket(); + int val = enable ? 1 : 0; + int ret = setsockopt(fd, SOL_SOCKET, SO_ZEROCOPY, &val, sizeof(val)); + + return (0 == ret); + } + + return false; +} + void AsyncServerSocket::bind(const SocketAddress& address) { if (eventBase_) { eventBase_->dcheckIsInEventBaseThread(); diff --git a/folly/io/async/AsyncServerSocket.h b/folly/io/async/AsyncServerSocket.h index fad20a01..6589c667 100644 --- a/folly/io/async/AsyncServerSocket.h +++ b/folly/io/async/AsyncServerSocket.h @@ -319,6 +319,11 @@ class AsyncServerSocket : public DelayedDestruction } } + /* enable zerocopy support for the server sockets - the s = accept sockets + * inherit it + */ + bool setZeroCopy(bool enable); + /** * Bind to the specified address. * diff --git a/folly/io/async/AsyncSocket.cpp b/folly/io/async/AsyncSocket.cpp index 7f8c5f13..abbec9b9 100644 --- a/folly/io/async/AsyncSocket.cpp +++ b/folly/io/async/AsyncSocket.cpp @@ -104,9 +104,28 @@ class AsyncSocket::BytesWriteRequest : public AsyncSocket::WriteRequest { if (getNext() != nullptr) { writeFlags |= WriteFlags::CORK; } + + socket_->adjustZeroCopyFlags(getOps(), getOpCount(), writeFlags); + auto writeResult = socket_->performWrite( getOps(), getOpCount(), writeFlags, &opsWritten_, &partialBytes_); bytesWritten_ = writeResult.writeReturn > 0 ? writeResult.writeReturn : 0; + if (bytesWritten_) { + if (socket_->isZeroCopyRequest(writeFlags)) { + if (isComplete()) { + socket_->addZeroCopyBuff(std::move(ioBuf_)); + } else { + socket_->addZeroCopyBuff(ioBuf_.get()); + } + } else { + // this happens if at least one of the prev requests were sent + // with zero copy but not the last one + if (isComplete() && socket_->getZeroCopy() && + socket_->containsZeroCopyBuff(ioBuf_.get())) { + socket_->setZeroCopyBuff(std::move(ioBuf_)); + } + } + } return writeResult; } @@ -119,11 +138,13 @@ class AsyncSocket::BytesWriteRequest : public AsyncSocket::WriteRequest { opIndex_ += opsWritten_; assert(opIndex_ < opCount_); - // If we've finished writing any IOBufs, release them - if (ioBuf_) { - for (uint32_t i = opsWritten_; i != 0; --i) { - assert(ioBuf_); - ioBuf_ = ioBuf_->pop(); + if (!socket_->isZeroCopyRequest(flags_)) { + // If we've finished writing any IOBufs, release them + if (ioBuf_) { + for (uint32_t i = opsWritten_; i != 0; --i) { + assert(ioBuf_); + ioBuf_ = ioBuf_->pop(); + } } } @@ -185,8 +206,9 @@ class AsyncSocket::BytesWriteRequest : public AsyncSocket::WriteRequest { struct iovec writeOps_[]; ///< write operation(s) list }; -int AsyncSocket::SendMsgParamsCallback::getDefaultFlags(folly::WriteFlags flags) - noexcept { +int AsyncSocket::SendMsgParamsCallback::getDefaultFlags( + folly::WriteFlags flags, + bool zeroCopyEnabled) noexcept { int msg_flags = MSG_DONTWAIT; #ifdef MSG_NOSIGNAL // Linux-only @@ -205,6 +227,10 @@ int AsyncSocket::SendMsgParamsCallback::getDefaultFlags(folly::WriteFlags flags) msg_flags |= MSG_EOR; } + if (zeroCopyEnabled && isSet(flags, WriteFlags::WRITE_MSG_ZEROCOPY)) { + msg_flags |= MSG_ZEROCOPY; + } + return msg_flags; } @@ -433,8 +459,11 @@ void AsyncSocket::connect(ConnectCallback* callback, // By default, turn on TCP_NODELAY // If setNoDelay() fails, we continue anyway; this isn't a fatal error. // setNoDelay() will log an error message if it fails. + // Also set the cached zeroCopyVal_ since it cannot be set earlier if the fd + // is not created if (address.getFamily() != AF_UNIX) { (void)setNoDelay(true); + setZeroCopy(zeroCopyVal_); } VLOG(5) << "AsyncSocket::connect(this=" << this << ", evb=" << eventBase_ @@ -772,6 +801,156 @@ AsyncSocket::ReadCallback* AsyncSocket::getReadCallback() const { return readCallback_; } +bool AsyncSocket::setZeroCopy(bool enable) { + if (msgErrQueueSupported) { + zeroCopyVal_ = enable; + + if (fd_ < 0) { + return false; + } + + int val = enable ? 1 : 0; + int ret = setsockopt(fd_, SOL_SOCKET, SO_ZEROCOPY, &val, sizeof(val)); + + // if enable == false, set zeroCopyEnabled_ = false regardless + // if SO_ZEROCOPY is set or not + if (!enable) { + zeroCopyEnabled_ = enable; + return true; + } + + /* if the setsockopt failed, try to see if the socket inherited the flag + * since we cannot set SO_ZEROCOPY on a socket s = accept + */ + if (ret) { + val = 0; + socklen_t optlen = sizeof(val); + ret = getsockopt(fd_, SOL_SOCKET, SO_ZEROCOPY, &val, &optlen); + + if (!ret) { + enable = val ? true : false; + } + } + + if (!ret) { + zeroCopyEnabled_ = enable; + + return true; + } + } + + return false; +} + +void AsyncSocket::setZeroCopyWriteChainThreshold(size_t threshold) { + zeroCopyWriteChainThreshold_ = threshold; +} + +bool AsyncSocket::isZeroCopyRequest(WriteFlags flags) { + return (zeroCopyEnabled_ && isSet(flags, WriteFlags::WRITE_MSG_ZEROCOPY)); +} + +void AsyncSocket::adjustZeroCopyFlags( + folly::IOBuf* buf, + folly::WriteFlags& flags) { + if (zeroCopyEnabled_ && zeroCopyWriteChainThreshold_ && buf) { + if (buf->computeChainDataLength() >= zeroCopyWriteChainThreshold_) { + flags |= folly::WriteFlags::WRITE_MSG_ZEROCOPY; + } else { + flags = unSet(flags, folly::WriteFlags::WRITE_MSG_ZEROCOPY); + } + } +} + +void AsyncSocket::adjustZeroCopyFlags( + const iovec* vec, + uint32_t count, + folly::WriteFlags& flags) { + if (zeroCopyEnabled_ && zeroCopyWriteChainThreshold_) { + count = std::min(count, kIovMax); + size_t sum = 0; + for (uint32_t i = 0; i < count; ++i) { + const iovec* v = vec + i; + sum += v->iov_len; + } + + if (sum >= zeroCopyWriteChainThreshold_) { + flags |= folly::WriteFlags::WRITE_MSG_ZEROCOPY; + } else { + flags = unSet(flags, folly::WriteFlags::WRITE_MSG_ZEROCOPY); + } + } +} + +void AsyncSocket::addZeroCopyBuff(std::unique_ptr&& buf) { + uint32_t id = getNextZeroCopyBuffId(); + folly::IOBuf* ptr = buf.get(); + + idZeroCopyBufPtrMap_[id] = ptr; + auto& p = idZeroCopyBufPtrToBufMap_[ptr]; + p.first++; + CHECK(p.second.get() == nullptr); + p.second = std::move(buf); +} + +void AsyncSocket::addZeroCopyBuff(folly::IOBuf* ptr) { + uint32_t id = getNextZeroCopyBuffId(); + idZeroCopyBufPtrMap_[id] = ptr; + + idZeroCopyBufPtrToBufMap_[ptr].first++; +} + +void AsyncSocket::releaseZeroCopyBuff(uint32_t id) { + auto iter = idZeroCopyBufPtrMap_.find(id); + CHECK(iter != idZeroCopyBufPtrMap_.end()); + auto ptr = iter->second; + auto iter1 = idZeroCopyBufPtrToBufMap_.find(ptr); + CHECK(iter1 != idZeroCopyBufPtrToBufMap_.end()); + if (0 == --iter1->second.first) { + idZeroCopyBufPtrToBufMap_.erase(iter1); + } +} + +void AsyncSocket::setZeroCopyBuff(std::unique_ptr&& buf) { + folly::IOBuf* ptr = buf.get(); + auto& p = idZeroCopyBufPtrToBufMap_[ptr]; + CHECK(p.second.get() == nullptr); + + p.second = std::move(buf); +} + +bool AsyncSocket::containsZeroCopyBuff(folly::IOBuf* ptr) { + return ( + idZeroCopyBufPtrToBufMap_.find(ptr) != idZeroCopyBufPtrToBufMap_.end()); +} + +bool AsyncSocket::isZeroCopyMsg(const cmsghdr& cmsg) const { +#ifdef MSG_ERRQUEUE + if (zeroCopyEnabled_ && + ((cmsg.cmsg_level == SOL_IP && cmsg.cmsg_type == IP_RECVERR) || + (cmsg.cmsg_level == SOL_IPV6 && cmsg.cmsg_type == IPV6_RECVERR))) { + const struct sock_extended_err* serr = + reinterpret_cast(CMSG_DATA(&cmsg)); + return ( + (serr->ee_errno == 0) && (serr->ee_origin == SO_EE_ORIGIN_ZEROCOPY)); + } +#endif + return false; +} + +void AsyncSocket::processZeroCopyMsg(const cmsghdr& cmsg) { +#ifdef MSG_ERRQUEUE + const struct sock_extended_err* serr = + reinterpret_cast(CMSG_DATA(&cmsg)); + uint32_t hi = serr->ee_data; + uint32_t lo = serr->ee_info; + + for (uint32_t i = lo; i <= hi; i++) { + releaseZeroCopyBuff(i); + } +#endif +} + void AsyncSocket::write(WriteCallback* callback, const void* buf, size_t bytes, WriteFlags flags) { iovec op; @@ -789,6 +968,8 @@ void AsyncSocket::writev(WriteCallback* callback, void AsyncSocket::writeChain(WriteCallback* callback, unique_ptr&& buf, WriteFlags flags) { + adjustZeroCopyFlags(buf.get(), flags); + constexpr size_t kSmallSizeMax = 64; size_t count = buf->countChainElements(); if (count <= kSmallSizeMax) { @@ -860,6 +1041,10 @@ void AsyncSocket::writeImpl(WriteCallback* callback, const iovec* vec, errnoCopy); return failWrite(__func__, callback, 0, ex); } else if (countWritten == count) { + // done, add the whole buffer + if (isZeroCopyRequest(flags)) { + addZeroCopyBuff(std::move(ioBuf)); + } // We successfully wrote everything. // Invoke the callback and return. if (callback) { @@ -867,6 +1052,10 @@ void AsyncSocket::writeImpl(WriteCallback* callback, const iovec* vec, } return; } else { // continue writing the next writeReq + // add just the ptr + if (isZeroCopyRequest(flags)) { + addZeroCopyBuff(ioBuf.get()); + } if (bufferCallback_) { bufferCallback_->onEgressBuffered(); } @@ -1545,7 +1734,8 @@ void AsyncSocket::handleErrMessages() noexcept { // supporting per-socket error queues. VLOG(5) << "AsyncSocket::handleErrMessages() this=" << this << ", fd=" << fd_ << ", state=" << state_; - if (errMessageCallback_ == nullptr) { + if (errMessageCallback_ == nullptr && + (!zeroCopyEnabled_ || idZeroCopyBufPtrMap_.empty())) { VLOG(7) << "AsyncSocket::handleErrMessages(): " << "no callback installed - exiting."; return; @@ -1587,11 +1777,15 @@ void AsyncSocket::handleErrMessages() noexcept { } for (struct cmsghdr* cmsg = CMSG_FIRSTHDR(&msg); - cmsg != nullptr && - cmsg->cmsg_len != 0 && - errMessageCallback_ != nullptr; + cmsg != nullptr && cmsg->cmsg_len != 0; cmsg = CMSG_NXTHDR(&msg, cmsg)) { - errMessageCallback_->errMessage(*cmsg); + if (isZeroCopyMsg(*cmsg)) { + processZeroCopyMsg(*cmsg); + } else { + if (errMessageCallback_) { + errMessageCallback_->errMessage(*cmsg); + } + } } } #endif //MSG_ERRQUEUE @@ -2127,7 +2321,7 @@ AsyncSocket::WriteResult AsyncSocket::performWrite( } else { msg.msg_control = nullptr; } - int msg_flags = sendMsgParamCallback_->getFlags(flags); + int msg_flags = sendMsgParamCallback_->getFlags(flags, zeroCopyEnabled_); auto writeResult = sendSocketMessage(fd_, &msg, msg_flags); auto totalWritten = writeResult.writeReturn; diff --git a/folly/io/async/AsyncSocket.h b/folly/io/async/AsyncSocket.h index beff1a07..e99300fb 100644 --- a/folly/io/async/AsyncSocket.h +++ b/folly/io/async/AsyncSocket.h @@ -156,8 +156,8 @@ class AsyncSocket : virtual public AsyncTransportWrapper { * * @param flags Write flags requested for the given write operation */ - int getFlags(folly::WriteFlags flags) noexcept { - return getFlagsImpl(flags, getDefaultFlags(flags)); + int getFlags(folly::WriteFlags flags, bool zeroCopyEnabled) noexcept { + return getFlagsImpl(flags, getDefaultFlags(flags, zeroCopyEnabled)); } /** @@ -211,7 +211,7 @@ class AsyncSocket : virtual public AsyncTransportWrapper { * * @param flags Write flags requested for the given write operation */ - int getDefaultFlags(folly::WriteFlags flags) noexcept; + int getDefaultFlags(folly::WriteFlags flags, bool zeroCopyEnabled) noexcept; }; explicit AsyncSocket(); @@ -504,6 +504,21 @@ class AsyncSocket : virtual public AsyncTransportWrapper { void setReadCB(ReadCallback* callback) override; ReadCallback* getReadCallback() const override; + static const size_t kDefaultZeroCopyThreshold = 32768; // 32KB + + bool setZeroCopy(bool enable); + bool getZeroCopy() const { + return zeroCopyEnabled_; + } + + void setZeroCopyWriteChainThreshold(size_t threshold); + size_t getZeroCopyWriteChainThreshold() const { + return zeroCopyWriteChainThreshold_; + } + + bool isZeroCopyMsg(const cmsghdr& cmsg) const; + void processZeroCopyMsg(const cmsghdr& cmsg); + void write(WriteCallback* callback, const void* buf, size_t bytes, WriteFlags flags = WriteFlags::NONE) override; void writev(WriteCallback* callback, const iovec* vec, size_t count, @@ -1133,6 +1148,32 @@ class AsyncSocket : virtual public AsyncTransportWrapper { void cacheLocalAddress() const; void cachePeerAddress() const; + bool isZeroCopyRequest(WriteFlags flags); + uint32_t getNextZeroCopyBuffId() { + return zeroCopyBuffId_++; + } + void adjustZeroCopyFlags(folly::IOBuf* buf, folly::WriteFlags& flags); + void adjustZeroCopyFlags( + const iovec* vec, + uint32_t count, + folly::WriteFlags& flags); + void addZeroCopyBuff(std::unique_ptr&& buf); + void addZeroCopyBuff(folly::IOBuf* ptr); + void setZeroCopyBuff(std::unique_ptr&& buf); + bool containsZeroCopyBuff(folly::IOBuf* ptr); + void releaseZeroCopyBuff(uint32_t id); + + // a folly::IOBuf can be used in multiple partial requests + // so we keep a map that maps a buffer id to a raw folly::IOBuf ptr + // and one more map that adds a ref count for a folly::IOBuf that is either + // the original ptr or nullptr + uint32_t zeroCopyBuffId_{0}; + std::unordered_map idZeroCopyBufPtrMap_; + std::unordered_map< + folly::IOBuf*, + std::pair>> + idZeroCopyBufPtrToBufMap_; + StateEnum state_; ///< StateEnum describing current state uint8_t shutdownFlags_; ///< Shutdown state (ShutdownFlags) uint16_t eventFlags_; ///< EventBase::HandlerFlags settings @@ -1149,8 +1190,8 @@ class AsyncSocket : virtual public AsyncTransportWrapper { ConnectCallback* connectCallback_; ///< ConnectCallback ErrMessageCallback* errMessageCallback_; ///< TimestampCallback - SendMsgParamsCallback* ///< Callback for retreaving - sendMsgParamCallback_; ///< ::sendmsg() parameters + SendMsgParamsCallback* ///< Callback for retrieving + sendMsgParamCallback_; ///< ::sendmsg() parameters ReadCallback* readCallback_; ///< ReadCallback WriteRequest* writeReqHead_; ///< Chain of WriteRequests WriteRequest* writeReqTail_; ///< End of WriteRequest chain @@ -1178,6 +1219,9 @@ class AsyncSocket : virtual public AsyncTransportWrapper { bool noTSocks_{false}; // Whether to track EOR or not. bool trackEor_{false}; + bool zeroCopyEnabled_{false}; + bool zeroCopyVal_{false}; + size_t zeroCopyWriteChainThreshold_{kDefaultZeroCopyThreshold}; std::unique_ptr evbChangeCb_{nullptr}; }; diff --git a/folly/io/async/AsyncTransport.h b/folly/io/async/AsyncTransport.h index ca5cc4d2..c42888da 100644 --- a/folly/io/async/AsyncTransport.h +++ b/folly/io/async/AsyncTransport.h @@ -60,6 +60,10 @@ enum class WriteFlags : uint32_t { * this indicates that only the write side of socket should be shutdown */ WRITE_SHUTDOWN = 0x04, + /* + * use msg zerocopy if allowed + */ + WRITE_MSG_ZEROCOPY = 0x08, }; /* diff --git a/folly/io/async/test/AsyncSSLSocketTest.h b/folly/io/async/test/AsyncSSLSocketTest.h index b916c933..122bb8f7 100644 --- a/folly/io/async/test/AsyncSSLSocketTest.h +++ b/folly/io/async/test/AsyncSSLSocketTest.h @@ -60,7 +60,7 @@ class SendMsgParamsCallbackBase : int getFlagsImpl(folly::WriteFlags flags, int /*defaultFlags*/) noexcept override { - return oldCallback_->getFlags(flags); + return oldCallback_->getFlags(flags, false /*zeroCopyEnabled*/); } void getAncillaryData(folly::WriteFlags flags, void* data) noexcept override { @@ -88,7 +88,7 @@ class SendMsgFlagsCallback : public SendMsgParamsCallbackBase { if (flags_) { return flags_; } else { - return oldCallback_->getFlags(flags); + return oldCallback_->getFlags(flags, false /*zeroCopyEnabled*/); } } diff --git a/folly/io/async/test/ZeroCopyBenchmark.cpp b/folly/io/async/test/ZeroCopyBenchmark.cpp new file mode 100644 index 00000000..7f397e00 --- /dev/null +++ b/folly/io/async/test/ZeroCopyBenchmark.cpp @@ -0,0 +1,379 @@ +/* + * Copyright 2017 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include +#include +#include +#include +#include +#include + +#include + +using namespace folly; + +class TestAsyncSocket { + public: + explicit TestAsyncSocket( + folly::EventBase* evb, + int numLoops, + size_t bufferSize, + bool zeroCopy) + : evb_(evb), + numLoops_(numLoops), + sock_(new folly::AsyncSocket(evb)), + callback_(this), + client_(true) { + setBufferSize(bufferSize); + setZeroCopy(zeroCopy); + } + + explicit TestAsyncSocket( + folly::EventBase* evb, + int fd, + int numLoops, + size_t bufferSize, + bool zeroCopy) + : evb_(evb), + numLoops_(numLoops), + sock_(new folly::AsyncSocket(evb, fd)), + callback_(this), + client_(false) { + setBufferSize(bufferSize); + setZeroCopy(zeroCopy); + // enable reads + if (sock_) { + sock_->setReadCB(&callback_); + } + } + + ~TestAsyncSocket() { + clearBuffers(); + } + + void connect(const folly::SocketAddress& remote) { + if (sock_) { + sock_->connect(&callback_, remote); + } + } + + private: + void setZeroCopy(bool enable) { + zeroCopy_ = enable; + if (sock_) { + sock_->setZeroCopy(zeroCopy_); + } + } + + void setBufferSize(size_t bufferSize) { + clearBuffers(); + bufferSize_ = bufferSize; + + readBuffer_ = new char[bufferSize_]; + } + + class Callback : public folly::AsyncSocket::ReadCallback, + public folly::AsyncSocket::ConnectCallback { + public: + explicit Callback(TestAsyncSocket* parent) : parent_(parent) {} + + void connectSuccess() noexcept override { + parent_->sock_->setReadCB(this); + parent_->onConnected(); + } + + void connectErr(const folly::AsyncSocketException& ex) noexcept override { + LOG(ERROR) << "Connect error: " << ex.what(); + parent_->onDataFinish(folly::exception_wrapper(ex)); + } + + void getReadBuffer(void** bufReturn, size_t* lenReturn) override { + parent_->getReadBuffer(bufReturn, lenReturn); + } + + void readDataAvailable(size_t len) noexcept override { + parent_->readDataAvailable(len); + } + + void readEOF() noexcept override { + parent_->onDataFinish(folly::exception_wrapper()); + } + + void readErr(const folly::AsyncSocketException& ex) noexcept override { + parent_->onDataFinish(folly::exception_wrapper(ex)); + } + + private: + TestAsyncSocket* parent_{nullptr}; + }; + + void clearBuffers() { + if (readBuffer_) { + delete[] readBuffer_; + } + } + + void getReadBuffer(void** bufReturn, size_t* lenReturn) { + *bufReturn = readBuffer_ + readOffset_; + *lenReturn = bufferSize_ - readOffset_; + } + + void readDataAvailable(size_t len) noexcept { + readOffset_ += len; + if (readOffset_ == bufferSize_) { + readOffset_ = 0; + onDataReady(); + } + } + + void onConnected() { + setZeroCopy(zeroCopy_); + writeBuffer(); + } + + void onDataReady() { + currLoop_++; + if (client_ && currLoop_ >= numLoops_) { + evb_->terminateLoopSoon(); + return; + } + writeBuffer(); + } + + void onDataFinish(folly::exception_wrapper) { + if (client_) { + evb_->terminateLoopSoon(); + } + } + + bool writeBuffer() { + writeBuffer_ = + folly::IOBuf::takeOwnership(::malloc(bufferSize_), bufferSize_); + + if (sock_ && writeBuffer_) { + sock_->writeChain( + nullptr, + std::move(writeBuffer_), + zeroCopy_ ? WriteFlags::WRITE_MSG_ZEROCOPY : WriteFlags::NONE); + } + + return true; + } + + folly::EventBase* evb_; + int numLoops_{0}; + int currLoop_{0}; + bool zeroCopy_{false}; + + folly::AsyncSocket::UniquePtr sock_; + Callback callback_; + + size_t bufferSize_{0}; + size_t readOffset_{0}; + char* readBuffer_{nullptr}; + std::unique_ptr writeBuffer_; + + bool client_; +}; + +class TestServer : public folly::AsyncServerSocket::AcceptCallback { + public: + explicit TestServer( + folly::EventBase* evb, + int numLoops, + size_t bufferSize, + bool zeroCopy) + : evb_(evb), + numLoops_(numLoops), + bufferSize_(bufferSize), + zeroCopy_(zeroCopy) {} + + void addCallbackToServerSocket(folly::AsyncServerSocket& sock) { + sock.addAcceptCallback(this, evb_); + } + + void connectionAccepted( + int fd, + const folly::SocketAddress& /* unused */) noexcept override { + auto client = std::make_shared( + evb_, fd, numLoops_, bufferSize_, zeroCopy_); + clients_[client.get()] = client; + } + + void acceptError(const std::exception&) noexcept override {} + + private: + folly::EventBase* evb_; + int numLoops_; + size_t bufferSize_; + bool zeroCopy_; + std::unique_ptr client_; + std::unordered_map> + clients_; +}; + +class Test { + public: + explicit Test(int numLoops, bool zeroCopy, size_t bufferSize) + : numLoops_(numLoops), + zeroCopy_(zeroCopy), + bufferSize_(bufferSize), + client_(new TestAsyncSocket(&evb_, numLoops_, bufferSize_, zeroCopy)), + listenSock_(new folly::AsyncServerSocket(&evb_)), + server_(&evb_, numLoops_, bufferSize_, zeroCopy) { + if (listenSock_) { + server_.addCallbackToServerSocket(*listenSock_); + } + } + + void run() { + evb_.runInEventBaseThread([this]() { + + if (listenSock_) { + listenSock_->bind(0); + listenSock_->setZeroCopy(zeroCopy_); + listenSock_->listen(10); + listenSock_->startAccepting(); + + connectOne(); + } + }); + + evb_.loopForever(); + } + + private: + void connectOne() { + SocketAddress addr = listenSock_->getAddress(); + client_->connect(addr); + } + + int numLoops_; + bool zeroCopy_; + size_t bufferSize_; + + EventBase evb_; + std::unique_ptr client_; + folly::AsyncServerSocket::UniquePtr listenSock_; + TestServer server_; +}; + +void runClient( + const std::string& host, + uint16_t port, + int numLoops, + bool zeroCopy, + size_t bufferSize) { + LOG(INFO) << "Running client. host = " << host << " port = " << port + << " numLoops = " << numLoops << " zeroCopy = " << zeroCopy + << " bufferSize = " << bufferSize; + + EventBase evb; + std::unique_ptr client( + new TestAsyncSocket(&evb, numLoops, bufferSize, zeroCopy)); + SocketAddress addr(host, port); + evb.runInEventBaseThread([&]() { client->connect(addr); }); + + evb.loopForever(); +} + +void runServer(uint16_t port, int numLoops, bool zeroCopy, size_t bufferSize) { + LOG(INFO) << "Running server. port = " << port << " numLoops = " << numLoops + << " zeroCopy = " << zeroCopy << " bufferSize = " << bufferSize; + + EventBase evb; + folly::AsyncServerSocket::UniquePtr listenSock( + new folly::AsyncServerSocket(&evb)); + TestServer server(&evb, numLoops, bufferSize, zeroCopy); + + server.addCallbackToServerSocket(*listenSock); + + evb.runInEventBaseThread([&]() { + listenSock->bind(port); + listenSock->setZeroCopy(zeroCopy); + listenSock->listen(10); + listenSock->startAccepting(); + }); + + evb.loopForever(); +} + +static auto constexpr kMaxLoops = 200000; + +void zeroCopyOn(unsigned /* unused */, size_t bufferSize) { + Test test(kMaxLoops, true, bufferSize); + test.run(); +} + +void zeroCopyOff(unsigned /* unused */, size_t bufferSize) { + Test test(kMaxLoops, false, bufferSize); + test.run(); +} + +BENCHMARK_PARAM(zeroCopyOn, 4096); +BENCHMARK_PARAM(zeroCopyOff, 4096); +BENCHMARK_DRAW_LINE() +BENCHMARK_PARAM(zeroCopyOn, 8192); +BENCHMARK_PARAM(zeroCopyOff, 8192); +BENCHMARK_DRAW_LINE() +BENCHMARK_PARAM(zeroCopyOn, 16384); +BENCHMARK_PARAM(zeroCopyOff, 16384); +BENCHMARK_DRAW_LINE() +BENCHMARK_PARAM(zeroCopyOn, 32768); +BENCHMARK_PARAM(zeroCopyOff, 32768); +BENCHMARK_DRAW_LINE() +BENCHMARK_PARAM(zeroCopyOn, 65536); +BENCHMARK_PARAM(zeroCopyOff, 65536); +BENCHMARK_DRAW_LINE() +BENCHMARK_PARAM(zeroCopyOn, 131072); +BENCHMARK_PARAM(zeroCopyOff, 131072); +BENCHMARK_DRAW_LINE() +BENCHMARK_PARAM(zeroCopyOn, 262144); +BENCHMARK_PARAM(zeroCopyOff, 262144); +BENCHMARK_DRAW_LINE() +BENCHMARK_PARAM(zeroCopyOn, 524288); +BENCHMARK_PARAM(zeroCopyOff, 524288); +BENCHMARK_DRAW_LINE() +BENCHMARK_PARAM(zeroCopyOn, 1048576); +BENCHMARK_PARAM(zeroCopyOff, 1048576); +BENCHMARK_DRAW_LINE() + +DEFINE_bool(client, false, "client mode"); +DEFINE_bool(server, false, "server mode"); +DEFINE_bool(zeroCopy, false, "use zerocopy"); +DEFINE_int32(numLoops, kMaxLoops, "number of loops"); +DEFINE_int32(bufferSize, 524288, "buffer size"); +DEFINE_int32(port, 33130, "port"); +DEFINE_string(host, "::1", "host"); + +int main(int argc, char** argv) { + gflags::ParseCommandLineFlags(&argc, &argv, true); + + if (FLAGS_client) { + runClient( + FLAGS_host, + FLAGS_port, + FLAGS_numLoops, + FLAGS_zeroCopy, + FLAGS_bufferSize); + } else if (FLAGS_server) { + runServer(FLAGS_port, FLAGS_numLoops, FLAGS_zeroCopy, FLAGS_bufferSize); + } else { + runBenchmarks(); + } +} diff --git a/folly/portability/Sockets.h b/folly/portability/Sockets.h index f1617fc9..99f1424d 100755 --- a/folly/portability/Sockets.h +++ b/folly/portability/Sockets.h @@ -25,6 +25,24 @@ #include #include #include + +#ifdef MSG_ERRQUEUE +/* for struct sock_extended_err*/ +#include +#endif + +#ifndef SO_EE_ORIGIN_ZEROCOPY +#define SO_EE_ORIGIN_ZEROCOPY 5 +#endif + +#ifndef SO_ZEROCOPY +#define SO_ZEROCOPY 60 +#endif + +#ifndef MSG_ZEROCOPY +#define MSG_ZEROCOPY 0x4000000 +#endif + #else #include #include @@ -35,6 +53,11 @@ using nfds_t = int; using sa_family_t = ADDRESS_FAMILY; +// these are not supported +#define SO_EE_ORIGIN_ZEROCOPY 0 +#define SO_ZEROCOPY 0 +#define MSG_ZEROCOPY 0x0 + // We don't actually support either of these flags // currently. #define MSG_DONTWAIT 0x1000 @@ -198,5 +221,6 @@ int setsockopt( #ifdef _WIN32 // Add our helpers to the overload set. -/* using override */ using namespace folly::portability::sockets; +/* using override */ +using namespace folly::portability::sockets; #endif -- 2.34.1