if (0 == --iter1->second.count_) {
idZeroCopyBufInfoMap_.erase(iter1);
}
+
+ idZeroCopyBufPtrMap_.erase(iter);
}
void AsyncSocket::setZeroCopyBuf(std::unique_ptr<folly::IOBuf>&& buf) {
return failWrite(__func__, callback, 0, ex);
} else if (countWritten == count) {
// done, add the whole buffer
- if (isZeroCopyRequest(flags)) {
+ if (countWritten && isZeroCopyRequest(flags)) {
addZeroCopyBuf(std::move(ioBuf));
}
// We successfully wrote everything.
return;
} else { // continue writing the next writeReq
// add just the ptr
- if (isZeroCopyRequest(flags)) {
+ if (bytesWritten && isZeroCopyRequest(flags)) {
addZeroCopyBuf(ioBuf.get());
}
if (bufferCallback_) {
#endif // FOLLY_HAVE_MSG_ERRQUEUE
}
+bool AsyncSocket::processZeroCopyWriteInProgress() noexcept {
+ eventBase_->dcheckIsInEventBaseThread();
+ if (idZeroCopyBufPtrMap_.empty()) {
+ return true;
+ }
+
+ handleErrMessages();
+
+ return idZeroCopyBufPtrMap_.empty();
+}
+
void AsyncSocket::handleRead() noexcept {
VLOG(5) << "AsyncSocket::handleRead() this=" << this << ", fd=" << fd_
<< ", state=" << state_;
*/
bool isZeroCopyWriteInProgress() const noexcept;
+ /**
+ * Tries to process the msg error queue
+ * And returns true if there are no more zero copy writes in progress
+ */
+ bool processZeroCopyWriteInProgress() noexcept;
+
/**
* writeReturn is the total number of bytes written, or WRITE_ERROR on error.
* If no data has been written, 0 is returned.
--- /dev/null
+/*
+ * Copyright 2017 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <folly/io/async/test/ZeroCopy.h>
+
+namespace folly {
+
+// ZeroCopyTest
+ZeroCopyTest::ZeroCopyTest(int numLoops, bool zeroCopy, size_t bufferSize)
+ : numLoops_(numLoops),
+ zeroCopy_(zeroCopy),
+ bufferSize_(bufferSize),
+ client_(
+ new ZeroCopyTestAsyncSocket(&evb_, numLoops_, bufferSize_, zeroCopy)),
+ listenSock_(new folly::AsyncServerSocket(&evb_)),
+ server_(&evb_, numLoops_, bufferSize_, zeroCopy) {
+ if (listenSock_) {
+ server_.addCallbackToServerSocket(*listenSock_);
+ }
+}
+
+bool ZeroCopyTest::run() {
+ evb_.runInEventBaseThread([this]() {
+ if (listenSock_) {
+ listenSock_->bind(0);
+ listenSock_->setZeroCopy(zeroCopy_);
+ listenSock_->listen(10);
+ listenSock_->startAccepting();
+
+ connectOne();
+ }
+ });
+
+ evb_.loopForever();
+
+ return !client_->isZeroCopyWriteInProgress();
+}
+
+} // namespace folly
--- /dev/null
+/*
+ * Copyright 2017 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <folly/ExceptionWrapper.h>
+#include <folly/SocketAddress.h>
+#include <folly/io/IOBufQueue.h>
+#include <folly/io/async/AsyncServerSocket.h>
+#include <folly/io/async/AsyncSocket.h>
+#include <folly/io/async/EventBase.h>
+
+namespace folly {
+
+class ZeroCopyTestAsyncSocket {
+ public:
+ explicit ZeroCopyTestAsyncSocket(
+ folly::EventBase* evb,
+ int numLoops,
+ size_t bufferSize,
+ bool zeroCopy)
+ : evb_(evb),
+ numLoops_(numLoops),
+ sock_(new folly::AsyncSocket(evb)),
+ callback_(this),
+ client_(true) {
+ setBufferSize(bufferSize);
+ setZeroCopy(zeroCopy);
+ }
+
+ explicit ZeroCopyTestAsyncSocket(
+ folly::EventBase* evb,
+ int fd,
+ int numLoops,
+ size_t bufferSize,
+ bool zeroCopy)
+ : evb_(evb),
+ numLoops_(numLoops),
+ sock_(new folly::AsyncSocket(evb, fd)),
+ callback_(this),
+ client_(false) {
+ setBufferSize(bufferSize);
+ setZeroCopy(zeroCopy);
+ // enable reads
+ if (sock_) {
+ sock_->setReadCB(&callback_);
+ }
+ }
+
+ ~ZeroCopyTestAsyncSocket() {
+ clearBuffers();
+ }
+
+ void connect(const folly::SocketAddress& remote) {
+ if (sock_) {
+ sock_->connect(&callback_, remote);
+ }
+ }
+
+ bool isZeroCopyWriteInProgress() const {
+ return sock_->isZeroCopyWriteInProgress();
+ }
+
+ private:
+ void setZeroCopy(bool enable) {
+ zeroCopy_ = enable;
+ if (sock_) {
+ sock_->setZeroCopy(zeroCopy_);
+ }
+ }
+
+ void setBufferSize(size_t bufferSize) {
+ clearBuffers();
+ bufferSize_ = bufferSize;
+
+ readBuffer_ = new char[bufferSize_];
+ }
+
+ class Callback : public folly::AsyncSocket::ReadCallback,
+ public folly::AsyncSocket::ConnectCallback {
+ public:
+ explicit Callback(ZeroCopyTestAsyncSocket* parent) : parent_(parent) {}
+
+ void connectSuccess() noexcept override {
+ parent_->sock_->setReadCB(this);
+ parent_->onConnected();
+ }
+
+ void connectErr(const folly::AsyncSocketException& ex) noexcept override {
+ LOG(ERROR) << "Connect error: " << ex.what();
+ parent_->onDataFinish(folly::exception_wrapper(ex));
+ }
+
+ void getReadBuffer(void** bufReturn, size_t* lenReturn) override {
+ parent_->getReadBuffer(bufReturn, lenReturn);
+ }
+
+ void readDataAvailable(size_t len) noexcept override {
+ parent_->readDataAvailable(len);
+ }
+
+ void readEOF() noexcept override {
+ parent_->onDataFinish(folly::exception_wrapper());
+ }
+
+ void readErr(const folly::AsyncSocketException& ex) noexcept override {
+ parent_->onDataFinish(folly::exception_wrapper(ex));
+ }
+
+ private:
+ ZeroCopyTestAsyncSocket* parent_{nullptr};
+ };
+
+ void clearBuffers() {
+ if (readBuffer_) {
+ delete[] readBuffer_;
+ }
+ }
+
+ void getReadBuffer(void** bufReturn, size_t* lenReturn) {
+ *bufReturn = readBuffer_ + readOffset_;
+ *lenReturn = bufferSize_ - readOffset_;
+ }
+
+ void readDataAvailable(size_t len) noexcept {
+ readOffset_ += len;
+ if (readOffset_ == bufferSize_) {
+ readOffset_ = 0;
+ onDataReady();
+ }
+ }
+
+ void onConnected() {
+ setZeroCopy(zeroCopy_);
+ writeBuffer();
+ }
+
+ void onDataReady() {
+ currLoop_++;
+ if (client_ && currLoop_ >= numLoops_) {
+ evb_->runInLoop(
+ [this] { evb_->terminateLoopSoon(); }, false /*thisIteration*/);
+ return;
+ }
+ writeBuffer();
+ }
+
+ void onDataFinish(folly::exception_wrapper) {
+ if (client_) {
+ evb_->terminateLoopSoon();
+ }
+ }
+
+ bool writeBuffer() {
+ // use calloc to make sure the memory is touched
+ // if the memory is just malloc'd, running the zeroCopyOn
+ // and the zeroCopyOff back to back on a system that does not support
+ // zerocopy leads to the second test being much slower
+ writeBuffer_ =
+ folly::IOBuf::takeOwnership(::calloc(1, bufferSize_), bufferSize_);
+
+ if (sock_ && writeBuffer_) {
+ sock_->writeChain(
+ nullptr,
+ std::move(writeBuffer_),
+ zeroCopy_ ? WriteFlags::WRITE_MSG_ZEROCOPY : WriteFlags::NONE);
+ }
+
+ return true;
+ }
+
+ folly::EventBase* evb_;
+ int numLoops_{0};
+ int currLoop_{0};
+ bool zeroCopy_{false};
+
+ folly::AsyncSocket::UniquePtr sock_;
+ Callback callback_;
+
+ size_t bufferSize_{0};
+ size_t readOffset_{0};
+ char* readBuffer_{nullptr};
+ std::unique_ptr<folly::IOBuf> writeBuffer_;
+
+ bool client_;
+};
+
+class ZeroCopyTestServer : public folly::AsyncServerSocket::AcceptCallback {
+ public:
+ explicit ZeroCopyTestServer(
+ folly::EventBase* evb,
+ int numLoops,
+ size_t bufferSize,
+ bool zeroCopy)
+ : evb_(evb),
+ numLoops_(numLoops),
+ bufferSize_(bufferSize),
+ zeroCopy_(zeroCopy) {}
+
+ void addCallbackToServerSocket(folly::AsyncServerSocket& sock) {
+ sock.addAcceptCallback(this, evb_);
+ }
+
+ void connectionAccepted(
+ int fd,
+ const folly::SocketAddress& /* unused */) noexcept override {
+ auto client = std::make_shared<ZeroCopyTestAsyncSocket>(
+ evb_, fd, numLoops_, bufferSize_, zeroCopy_);
+ clients_[client.get()] = client;
+ }
+
+ void acceptError(const std::exception&) noexcept override {}
+
+ private:
+ folly::EventBase* evb_;
+ int numLoops_;
+ size_t bufferSize_;
+ bool zeroCopy_;
+ std::unique_ptr<ZeroCopyTestAsyncSocket> client_;
+ std::unordered_map<
+ ZeroCopyTestAsyncSocket*,
+ std::shared_ptr<ZeroCopyTestAsyncSocket>>
+ clients_;
+};
+
+class ZeroCopyTest {
+ public:
+ explicit ZeroCopyTest(int numLoops, bool zeroCopy, size_t bufferSize);
+ bool run();
+
+ private:
+ void connectOne() {
+ SocketAddress addr = listenSock_->getAddress();
+ client_->connect(addr);
+ }
+
+ int numLoops_;
+ bool zeroCopy_;
+ size_t bufferSize_;
+
+ EventBase evb_;
+ std::unique_ptr<ZeroCopyTestAsyncSocket> client_;
+ folly::AsyncServerSocket::UniquePtr listenSock_;
+ ZeroCopyTestServer server_;
+};
+
+} // namespace folly
*/
#include <folly/Benchmark.h>
-
-#include <folly/ExceptionWrapper.h>
-#include <folly/SocketAddress.h>
-#include <folly/io/IOBufQueue.h>
-#include <folly/io/async/AsyncServerSocket.h>
-#include <folly/io/async/AsyncSocket.h>
-#include <folly/io/async/EventBase.h>
-
+#include <folly/io/async/test/ZeroCopy.h>
#include <folly/portability/GFlags.h>
using namespace folly;
-
-class TestAsyncSocket {
- public:
- explicit TestAsyncSocket(
- folly::EventBase* evb,
- int numLoops,
- size_t bufferSize,
- bool zeroCopy)
- : evb_(evb),
- numLoops_(numLoops),
- sock_(new folly::AsyncSocket(evb)),
- callback_(this),
- client_(true) {
- setBufferSize(bufferSize);
- setZeroCopy(zeroCopy);
- }
-
- explicit TestAsyncSocket(
- folly::EventBase* evb,
- int fd,
- int numLoops,
- size_t bufferSize,
- bool zeroCopy)
- : evb_(evb),
- numLoops_(numLoops),
- sock_(new folly::AsyncSocket(evb, fd)),
- callback_(this),
- client_(false) {
- setBufferSize(bufferSize);
- setZeroCopy(zeroCopy);
- // enable reads
- if (sock_) {
- sock_->setReadCB(&callback_);
- }
- }
-
- ~TestAsyncSocket() {
- clearBuffers();
- }
-
- void connect(const folly::SocketAddress& remote) {
- if (sock_) {
- sock_->connect(&callback_, remote);
- }
- }
-
- private:
- void setZeroCopy(bool enable) {
- zeroCopy_ = enable;
- if (sock_) {
- sock_->setZeroCopy(zeroCopy_);
- }
- }
-
- void setBufferSize(size_t bufferSize) {
- clearBuffers();
- bufferSize_ = bufferSize;
-
- readBuffer_ = new char[bufferSize_];
- }
-
- class Callback : public folly::AsyncSocket::ReadCallback,
- public folly::AsyncSocket::ConnectCallback {
- public:
- explicit Callback(TestAsyncSocket* parent) : parent_(parent) {}
-
- void connectSuccess() noexcept override {
- parent_->sock_->setReadCB(this);
- parent_->onConnected();
- }
-
- void connectErr(const folly::AsyncSocketException& ex) noexcept override {
- LOG(ERROR) << "Connect error: " << ex.what();
- parent_->onDataFinish(folly::exception_wrapper(ex));
- }
-
- void getReadBuffer(void** bufReturn, size_t* lenReturn) override {
- parent_->getReadBuffer(bufReturn, lenReturn);
- }
-
- void readDataAvailable(size_t len) noexcept override {
- parent_->readDataAvailable(len);
- }
-
- void readEOF() noexcept override {
- parent_->onDataFinish(folly::exception_wrapper());
- }
-
- void readErr(const folly::AsyncSocketException& ex) noexcept override {
- parent_->onDataFinish(folly::exception_wrapper(ex));
- }
-
- private:
- TestAsyncSocket* parent_{nullptr};
- };
-
- void clearBuffers() {
- if (readBuffer_) {
- delete[] readBuffer_;
- }
- }
-
- void getReadBuffer(void** bufReturn, size_t* lenReturn) {
- *bufReturn = readBuffer_ + readOffset_;
- *lenReturn = bufferSize_ - readOffset_;
- }
-
- void readDataAvailable(size_t len) noexcept {
- readOffset_ += len;
- if (readOffset_ == bufferSize_) {
- readOffset_ = 0;
- onDataReady();
- }
- }
-
- void onConnected() {
- setZeroCopy(zeroCopy_);
- writeBuffer();
- }
-
- void onDataReady() {
- currLoop_++;
- if (client_ && currLoop_ >= numLoops_) {
- evb_->terminateLoopSoon();
- return;
- }
- writeBuffer();
- }
-
- void onDataFinish(folly::exception_wrapper) {
- if (client_) {
- evb_->terminateLoopSoon();
- }
- }
-
- bool writeBuffer() {
- // use calloc to make sure the memory is touched
- // if the memory is just malloc'd, running the zeroCopyOn
- // and the zeroCopyOff back to back on a system that does not support
- // zerocopy leads to the second test being much slower
- writeBuffer_ =
- folly::IOBuf::takeOwnership(::calloc(1, bufferSize_), bufferSize_);
-
- if (sock_ && writeBuffer_) {
- sock_->writeChain(
- nullptr,
- std::move(writeBuffer_),
- zeroCopy_ ? WriteFlags::WRITE_MSG_ZEROCOPY : WriteFlags::NONE);
- }
-
- return true;
- }
-
- folly::EventBase* evb_;
- int numLoops_{0};
- int currLoop_{0};
- bool zeroCopy_{false};
-
- folly::AsyncSocket::UniquePtr sock_;
- Callback callback_;
-
- size_t bufferSize_{0};
- size_t readOffset_{0};
- char* readBuffer_{nullptr};
- std::unique_ptr<folly::IOBuf> writeBuffer_;
-
- bool client_;
-};
-
-class TestServer : public folly::AsyncServerSocket::AcceptCallback {
- public:
- explicit TestServer(
- folly::EventBase* evb,
- int numLoops,
- size_t bufferSize,
- bool zeroCopy)
- : evb_(evb),
- numLoops_(numLoops),
- bufferSize_(bufferSize),
- zeroCopy_(zeroCopy) {}
-
- void addCallbackToServerSocket(folly::AsyncServerSocket& sock) {
- sock.addAcceptCallback(this, evb_);
- }
-
- void connectionAccepted(
- int fd,
- const folly::SocketAddress& /* unused */) noexcept override {
- auto client = std::make_shared<TestAsyncSocket>(
- evb_, fd, numLoops_, bufferSize_, zeroCopy_);
- clients_[client.get()] = client;
- }
-
- void acceptError(const std::exception&) noexcept override {}
-
- private:
- folly::EventBase* evb_;
- int numLoops_;
- size_t bufferSize_;
- bool zeroCopy_;
- std::unique_ptr<TestAsyncSocket> client_;
- std::unordered_map<TestAsyncSocket*, std::shared_ptr<TestAsyncSocket>>
- clients_;
-};
-
-class Test {
- public:
- explicit Test(int numLoops, bool zeroCopy, size_t bufferSize)
- : numLoops_(numLoops),
- zeroCopy_(zeroCopy),
- bufferSize_(bufferSize),
- client_(new TestAsyncSocket(&evb_, numLoops_, bufferSize_, zeroCopy)),
- listenSock_(new folly::AsyncServerSocket(&evb_)),
- server_(&evb_, numLoops_, bufferSize_, zeroCopy) {
- if (listenSock_) {
- server_.addCallbackToServerSocket(*listenSock_);
- }
- }
-
- void run() {
- evb_.runInEventBaseThread([this]() {
-
- if (listenSock_) {
- listenSock_->bind(0);
- listenSock_->setZeroCopy(zeroCopy_);
- listenSock_->listen(10);
- listenSock_->startAccepting();
-
- connectOne();
- }
- });
-
- evb_.loopForever();
- }
-
- private:
- void connectOne() {
- SocketAddress addr = listenSock_->getAddress();
- client_->connect(addr);
- }
-
- int numLoops_;
- bool zeroCopy_;
- size_t bufferSize_;
-
- EventBase evb_;
- std::unique_ptr<TestAsyncSocket> client_;
- folly::AsyncServerSocket::UniquePtr listenSock_;
- TestServer server_;
-};
-
+namespace {
void runClient(
const std::string& host,
uint16_t port,
<< " bufferSize = " << bufferSize;
EventBase evb;
- std::unique_ptr<TestAsyncSocket> client(
- new TestAsyncSocket(&evb, numLoops, bufferSize, zeroCopy));
+ std::unique_ptr<ZeroCopyTestAsyncSocket> client(
+ new ZeroCopyTestAsyncSocket(&evb, numLoops, bufferSize, zeroCopy));
SocketAddress addr(host, port);
evb.runInEventBaseThread([&]() { client->connect(addr); });
EventBase evb;
folly::AsyncServerSocket::UniquePtr listenSock(
new folly::AsyncServerSocket(&evb));
- TestServer server(&evb, numLoops, bufferSize, zeroCopy);
+ ZeroCopyTestServer server(&evb, numLoops, bufferSize, zeroCopy);
server.addCallbackToServerSocket(*listenSock);
evb.loopForever();
}
+} // namespace
-static auto constexpr kMaxLoops = 200000;
+static auto constexpr kMaxLoops = 20000;
void zeroCopyOn(unsigned /* unused */, size_t bufferSize) {
- Test test(kMaxLoops, true, bufferSize);
+ ZeroCopyTest test(kMaxLoops, true, bufferSize);
test.run();
}
void zeroCopyOff(unsigned /* unused */, size_t bufferSize) {
- Test test(kMaxLoops, false, bufferSize);
+ ZeroCopyTest test(kMaxLoops, false, bufferSize);
test.run();
}
--- /dev/null
+/*
+ * Copyright 2017 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <folly/io/async/test/ZeroCopy.h>
+#include <folly/portability/GTest.h>
+
+using namespace testing;
+using namespace folly;
+
+static auto constexpr kMaxLoops = 20;
+static auto constexpr kBufferSize = 4096;
+
+TEST(ZeroCopyTest, zero_copy_in_progress) {
+ ZeroCopyTest test(kMaxLoops, true, kBufferSize);
+ CHECK(test.run());
+}