if (bytesWritten_) {
if (socket_->isZeroCopyRequest(writeFlags)) {
if (isComplete()) {
- socket_->addZeroCopyBuff(std::move(ioBuf_));
+ socket_->addZeroCopyBuf(std::move(ioBuf_));
} else {
- socket_->addZeroCopyBuff(ioBuf_.get());
+ socket_->addZeroCopyBuf(ioBuf_.get());
}
} else {
// this happens if at least one of the prev requests were sent
// with zero copy but not the last one
if (isComplete() && socket_->getZeroCopy() &&
- socket_->containsZeroCopyBuff(ioBuf_.get())) {
- socket_->setZeroCopyBuff(std::move(ioBuf_));
+ socket_->containsZeroCopyBuf(ioBuf_.get())) {
+ socket_->setZeroCopyBuf(std::move(ioBuf_));
}
}
}
}
}
-void AsyncSocket::addZeroCopyBuff(std::unique_ptr<folly::IOBuf>&& buf) {
+void AsyncSocket::addZeroCopyBuf(std::unique_ptr<folly::IOBuf>&& buf) {
uint32_t id = getNextZeroCopyBuffId();
folly::IOBuf* ptr = buf.get();
idZeroCopyBufPtrMap_[id] = ptr;
- auto& p = idZeroCopyBufPtrToBufMap_[ptr];
- p.first++;
- CHECK(p.second.get() == nullptr);
- p.second = std::move(buf);
+ auto& p = idZeroCopyBufInfoMap_[ptr];
+ p.count_++;
+ CHECK(p.buf_.get() == nullptr);
+ p.buf_ = std::move(buf);
}
-void AsyncSocket::addZeroCopyBuff(folly::IOBuf* ptr) {
+void AsyncSocket::addZeroCopyBuf(folly::IOBuf* ptr) {
uint32_t id = getNextZeroCopyBuffId();
idZeroCopyBufPtrMap_[id] = ptr;
- idZeroCopyBufPtrToBufMap_[ptr].first++;
+ idZeroCopyBufInfoMap_[ptr].count_++;
}
-void AsyncSocket::releaseZeroCopyBuff(uint32_t id) {
+void AsyncSocket::releaseZeroCopyBuf(uint32_t id) {
auto iter = idZeroCopyBufPtrMap_.find(id);
CHECK(iter != idZeroCopyBufPtrMap_.end());
auto ptr = iter->second;
- auto iter1 = idZeroCopyBufPtrToBufMap_.find(ptr);
- CHECK(iter1 != idZeroCopyBufPtrToBufMap_.end());
- if (0 == --iter1->second.first) {
- idZeroCopyBufPtrToBufMap_.erase(iter1);
+ auto iter1 = idZeroCopyBufInfoMap_.find(ptr);
+ CHECK(iter1 != idZeroCopyBufInfoMap_.end());
+ if (0 == --iter1->second.count_) {
+ idZeroCopyBufInfoMap_.erase(iter1);
}
}
-void AsyncSocket::setZeroCopyBuff(std::unique_ptr<folly::IOBuf>&& buf) {
+void AsyncSocket::setZeroCopyBuf(std::unique_ptr<folly::IOBuf>&& buf) {
folly::IOBuf* ptr = buf.get();
- auto& p = idZeroCopyBufPtrToBufMap_[ptr];
- CHECK(p.second.get() == nullptr);
+ auto& p = idZeroCopyBufInfoMap_[ptr];
+ CHECK(p.buf_.get() == nullptr);
- p.second = std::move(buf);
+ p.buf_ = std::move(buf);
}
-bool AsyncSocket::containsZeroCopyBuff(folly::IOBuf* ptr) {
- return (
- idZeroCopyBufPtrToBufMap_.find(ptr) != idZeroCopyBufPtrToBufMap_.end());
+bool AsyncSocket::containsZeroCopyBuf(folly::IOBuf* ptr) {
+ return (idZeroCopyBufInfoMap_.find(ptr) != idZeroCopyBufInfoMap_.end());
}
bool AsyncSocket::isZeroCopyMsg(const cmsghdr& cmsg) const {
reinterpret_cast<const struct sock_extended_err*>(CMSG_DATA(&cmsg));
uint32_t hi = serr->ee_data;
uint32_t lo = serr->ee_info;
+ // disable zero copy if the buffer was actually copied
+ if ((serr->ee_code & SO_EE_CODE_ZEROCOPY_COPIED) && zeroCopyEnabled_) {
+ VLOG(2) << "AsyncSocket::processZeroCopyMsg(): setting "
+ << "zeroCopyEnabled_ = false due to SO_EE_CODE_ZEROCOPY_COPIED "
+ << "on " << fd_;
+ zeroCopyEnabled_ = false;
+ }
for (uint32_t i = lo; i <= hi; i++) {
- releaseZeroCopyBuff(i);
+ releaseZeroCopyBuf(i);
}
#endif
}
} else if (countWritten == count) {
// done, add the whole buffer
if (isZeroCopyRequest(flags)) {
- addZeroCopyBuff(std::move(ioBuf));
+ addZeroCopyBuf(std::move(ioBuf));
}
// We successfully wrote everything.
// Invoke the callback and return.
} else { // continue writing the next writeReq
// add just the ptr
if (isZeroCopyRequest(flags)) {
- addZeroCopyBuff(ioBuf.get());
+ addZeroCopyBuf(ioBuf.get());
}
if (bufferCallback_) {
bufferCallback_->onEgressBuffered();
}
}
+bool AsyncSocket::isZeroCopyWriteInProgress() const noexcept {
+ eventBase_->dcheckIsInEventBaseThread();
+ return (!idZeroCopyBufPtrMap_.empty());
+}
+
void AsyncSocket::getLocalAddress(folly::SocketAddress* address) const {
cacheLocalAddress();
*address = localAddr_;
*/
void cacheAddresses();
+ /**
+ * Returns true if there is any zero copy write in progress
+ * Needs to be called from within the socket's EVB thread
+ */
+ bool isZeroCopyWriteInProgress() const noexcept;
+
/**
* writeReturn is the total number of bytes written, or WRITE_ERROR on error.
* If no data has been written, 0 is returned.
const iovec* vec,
uint32_t count,
folly::WriteFlags& flags);
- void addZeroCopyBuff(std::unique_ptr<folly::IOBuf>&& buf);
- void addZeroCopyBuff(folly::IOBuf* ptr);
- void setZeroCopyBuff(std::unique_ptr<folly::IOBuf>&& buf);
- bool containsZeroCopyBuff(folly::IOBuf* ptr);
- void releaseZeroCopyBuff(uint32_t id);
+ void addZeroCopyBuf(std::unique_ptr<folly::IOBuf>&& buf);
+ void addZeroCopyBuf(folly::IOBuf* ptr);
+ void setZeroCopyBuf(std::unique_ptr<folly::IOBuf>&& buf);
+ bool containsZeroCopyBuf(folly::IOBuf* ptr);
+ void releaseZeroCopyBuf(uint32_t id);
// a folly::IOBuf can be used in multiple partial requests
- // so we keep a map that maps a buffer id to a raw folly::IOBuf ptr
- // and one more map that adds a ref count for a folly::IOBuf that is either
+ // there is a that maps a buffer id to a raw folly::IOBuf ptr
+ // and another one that adds a ref count for a folly::IOBuf that is either
// the original ptr or nullptr
uint32_t zeroCopyBuffId_{0};
+
+ struct IOBufInfo {
+ uint32_t count_{0};
+ std::unique_ptr<folly::IOBuf> buf_;
+ };
+
std::unordered_map<uint32_t, folly::IOBuf*> idZeroCopyBufPtrMap_;
- std::unordered_map<
- folly::IOBuf*,
- std::pair<uint32_t, std::unique_ptr<folly::IOBuf>>>
- idZeroCopyBufPtrToBufMap_;
+ std::unordered_map<folly::IOBuf*, IOBufInfo> idZeroCopyBufInfoMap_;
StateEnum state_; ///< StateEnum describing current state
uint8_t shutdownFlags_; ///< Shutdown state (ShutdownFlags)