+bool AsyncSocket::setZeroCopy(bool enable) {
+ if (msgErrQueueSupported) {
+ zeroCopyVal_ = enable;
+
+ if (fd_ < 0) {
+ return false;
+ }
+
+ int val = enable ? 1 : 0;
+ int ret = setsockopt(fd_, SOL_SOCKET, SO_ZEROCOPY, &val, sizeof(val));
+
+ // if enable == false, set zeroCopyEnabled_ = false regardless
+ // if SO_ZEROCOPY is set or not
+ if (!enable) {
+ zeroCopyEnabled_ = enable;
+ return true;
+ }
+
+ /* if the setsockopt failed, try to see if the socket inherited the flag
+ * since we cannot set SO_ZEROCOPY on a socket s = accept
+ */
+ if (ret) {
+ val = 0;
+ socklen_t optlen = sizeof(val);
+ ret = getsockopt(fd_, SOL_SOCKET, SO_ZEROCOPY, &val, &optlen);
+
+ if (!ret) {
+ enable = val ? true : false;
+ }
+ }
+
+ if (!ret) {
+ zeroCopyEnabled_ = enable;
+
+ return true;
+ }
+ }
+
+ return false;
+}
+
+void AsyncSocket::setZeroCopyWriteChainThreshold(size_t threshold) {
+ zeroCopyWriteChainThreshold_ = threshold;
+}
+
+bool AsyncSocket::isZeroCopyRequest(WriteFlags flags) {
+ return (zeroCopyEnabled_ && isSet(flags, WriteFlags::WRITE_MSG_ZEROCOPY));
+}
+
+void AsyncSocket::adjustZeroCopyFlags(
+ folly::IOBuf* buf,
+ folly::WriteFlags& flags) {
+ if (zeroCopyEnabled_ && zeroCopyWriteChainThreshold_ && buf) {
+ if (buf->computeChainDataLength() >= zeroCopyWriteChainThreshold_) {
+ flags |= folly::WriteFlags::WRITE_MSG_ZEROCOPY;
+ } else {
+ flags = unSet(flags, folly::WriteFlags::WRITE_MSG_ZEROCOPY);
+ }
+ }
+}
+
+void AsyncSocket::adjustZeroCopyFlags(
+ const iovec* vec,
+ uint32_t count,
+ folly::WriteFlags& flags) {
+ if (zeroCopyEnabled_ && zeroCopyWriteChainThreshold_) {
+ count = std::min<uint32_t>(count, kIovMax);
+ size_t sum = 0;
+ for (uint32_t i = 0; i < count; ++i) {
+ const iovec* v = vec + i;
+ sum += v->iov_len;
+ }
+
+ if (sum >= zeroCopyWriteChainThreshold_) {
+ flags |= folly::WriteFlags::WRITE_MSG_ZEROCOPY;
+ } else {
+ flags = unSet(flags, folly::WriteFlags::WRITE_MSG_ZEROCOPY);
+ }
+ }
+}
+
+void AsyncSocket::addZeroCopyBuff(std::unique_ptr<folly::IOBuf>&& buf) {
+ uint32_t id = getNextZeroCopyBuffId();
+ folly::IOBuf* ptr = buf.get();
+
+ idZeroCopyBufPtrMap_[id] = ptr;
+ auto& p = idZeroCopyBufPtrToBufMap_[ptr];
+ p.first++;
+ CHECK(p.second.get() == nullptr);
+ p.second = std::move(buf);
+}
+
+void AsyncSocket::addZeroCopyBuff(folly::IOBuf* ptr) {
+ uint32_t id = getNextZeroCopyBuffId();
+ idZeroCopyBufPtrMap_[id] = ptr;
+
+ idZeroCopyBufPtrToBufMap_[ptr].first++;
+}
+
+void AsyncSocket::releaseZeroCopyBuff(uint32_t id) {
+ auto iter = idZeroCopyBufPtrMap_.find(id);
+ CHECK(iter != idZeroCopyBufPtrMap_.end());
+ auto ptr = iter->second;
+ auto iter1 = idZeroCopyBufPtrToBufMap_.find(ptr);
+ CHECK(iter1 != idZeroCopyBufPtrToBufMap_.end());
+ if (0 == --iter1->second.first) {
+ idZeroCopyBufPtrToBufMap_.erase(iter1);
+ }
+}
+
+void AsyncSocket::setZeroCopyBuff(std::unique_ptr<folly::IOBuf>&& buf) {
+ folly::IOBuf* ptr = buf.get();
+ auto& p = idZeroCopyBufPtrToBufMap_[ptr];
+ CHECK(p.second.get() == nullptr);
+
+ p.second = std::move(buf);
+}
+
+bool AsyncSocket::containsZeroCopyBuff(folly::IOBuf* ptr) {
+ return (
+ idZeroCopyBufPtrToBufMap_.find(ptr) != idZeroCopyBufPtrToBufMap_.end());
+}
+
+bool AsyncSocket::isZeroCopyMsg(const cmsghdr& cmsg) const {
+#ifdef MSG_ERRQUEUE
+ if (zeroCopyEnabled_ &&
+ ((cmsg.cmsg_level == SOL_IP && cmsg.cmsg_type == IP_RECVERR) ||
+ (cmsg.cmsg_level == SOL_IPV6 && cmsg.cmsg_type == IPV6_RECVERR))) {
+ const struct sock_extended_err* serr =
+ reinterpret_cast<const struct sock_extended_err*>(CMSG_DATA(&cmsg));
+ return (
+ (serr->ee_errno == 0) && (serr->ee_origin == SO_EE_ORIGIN_ZEROCOPY));
+ }
+#endif
+ return false;
+}
+
+void AsyncSocket::processZeroCopyMsg(const cmsghdr& cmsg) {
+#ifdef MSG_ERRQUEUE
+ const struct sock_extended_err* serr =
+ reinterpret_cast<const struct sock_extended_err*>(CMSG_DATA(&cmsg));
+ uint32_t hi = serr->ee_data;
+ uint32_t lo = serr->ee_info;
+
+ for (uint32_t i = lo; i <= hi; i++) {
+ releaseZeroCopyBuff(i);
+ }
+#endif
+}
+