Fix incorrect bytesWritten computation in AsyncSocket
authorTiho Tarnavski <tihot@fb.com>
Tue, 29 Nov 2016 23:41:06 +0000 (15:41 -0800)
committerFacebook Github Bot <facebook-github-bot-bot@fb.com>
Tue, 29 Nov 2016 23:53:28 +0000 (15:53 -0800)
Summary: If a write request is buffered after a partial write, then bytes written is not updated after subsequent write operations (`performWrite`) for the buffered write request (`BytesWriteRequest`). This results in a wrong value for totalBytesWritten_, which is reported in the error callback in case the write request fails.

Reviewed By: yfeldblum

Differential Revision: D4205743

fbshipit-source-id: f77ca55ccfdceda1008c45e72ec093b00bf250e4

folly/io/async/AsyncSocket.cpp
folly/io/async/test/AsyncSocketTest.h
folly/io/async/test/AsyncSocketTest2.cpp

index 729185b89c4f35461fd5cf015a9483e0a645d849..a51e08ead9a13d828594ebf51f1f8fe8bb6c15e1 100644 (file)
@@ -95,8 +95,10 @@ class AsyncSocket::BytesWriteRequest : public AsyncSocket::WriteRequest {
     if (getNext() != nullptr) {
       writeFlags = writeFlags | WriteFlags::CORK;
     }
-    return socket_->performWrite(
+    auto writeResult = socket_->performWrite(
         getOps(), getOpCount(), writeFlags, &opsWritten_, &partialBytes_);
+    bytesWritten_ = writeResult.writeReturn > 0 ? writeResult.writeReturn : 0;
+    return writeResult;
   }
 
   bool isComplete() override {
index 7795792f84934056e43efda9dbaa810ddba6f980..3bae6a087d0870642c546f6523ccc780a7634b2b 100644 (file)
@@ -206,7 +206,7 @@ class TestServer {
  public:
   // Create a TestServer.
   // This immediately starts listening on an ephemeral port.
-  explicit TestServer(bool enableTFO = false) : fd_(-1) {
+  explicit TestServer(bool enableTFO = false, int bufSize = -1) : fd_(-1) {
     namespace fsp = folly::portability::sockets;
     fd_ = fsp::socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
     if (fd_ < 0) {
@@ -246,6 +246,11 @@ class TestServer {
       freeaddrinfo(res);
     };
 
+    if (bufSize > 0) {
+      setsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &bufSize, sizeof(bufSize));
+      setsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &bufSize, sizeof(bufSize));
+    }
+
     if (bind(fd_, res->ai_addr, res->ai_addrlen)) {
       throw folly::AsyncSocketException(
           folly::AsyncSocketException::INTERNAL_ERROR,
index afe23fa134eb37f37e037373df6f4e627f6fc1e9..c42406a8a9e6d3a29dc7cdfb7ababf33ca4a089e 100644 (file)
@@ -1090,6 +1090,72 @@ TEST(AsyncSocketTest, WritePipeError) {
   ASSERT_FALSE(socket->isClosedByPeer());
 }
 
+/**
+ * Test that bytes written is correctly computed in case of write failure
+ */
+TEST(AsyncSocketTest, WriteErrorCallbackBytesWritten) {
+  // Send and receive buffer sizes for the sockets.
+  const int sockBufSize = 8 * 1024;
+
+  TestServer server(false, sockBufSize);
+
+  AsyncSocket::OptionMap options{
+      {{SOL_SOCKET, SO_SNDBUF}, sockBufSize},
+      {{SOL_SOCKET, SO_RCVBUF}, sockBufSize},
+      {{IPPROTO_TCP, TCP_NODELAY}, 1},
+  };
+
+  // The current thread will be used by the receiver - use a separate thread
+  // for the sender.
+  EventBase senderEvb;
+  std::thread senderThread([&]() { senderEvb.loopForever(); });
+
+  ConnCallback ccb;
+  std::shared_ptr<AsyncSocket> socket;
+
+  senderEvb.runInEventBaseThreadAndWait([&]() {
+    socket = AsyncSocket::newSocket(&senderEvb);
+    socket->connect(&ccb, server.getAddress(), 30, options);
+  });
+
+  // accept the socket on the server side
+  std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
+
+  // Send a big (45KB) write so that it is partially written. The first write
+  // is 16KB (8KB on both sides) and subsequent writes are 8KB each. Reading
+  // just under 24KB would cause 3-4 writes for the total of 32-40KB in the
+  // following sequence: 16KB + 8KB + 8KB (+ 8KB). This ensures that not all
+  // bytes are written when the socket is reset. Having at least 3 writes
+  // ensures that the total size (45KB) would be exceeed in case of overcounting
+  // based on the initial write size of 16KB.
+  constexpr size_t sendSize = 45 * 1024;
+  auto const sendBuf = std::vector<char>(sendSize, 'a');
+
+  WriteCallback wcb;
+
+  senderEvb.runInEventBaseThreadAndWait(
+      [&]() { socket->write(&wcb, sendBuf.data(), sendSize); });
+
+  // Reading 20KB would cause three additional writes of 8KB, but less
+  // than 45KB total, so the socket is reset before all bytes are written.
+  constexpr size_t recvSize = 20 * 1024;
+  uint8_t recvBuf[recvSize];
+  int bytesRead = acceptedSocket->readAll(recvBuf, sizeof(recvBuf));
+
+  acceptedSocket->closeWithReset();
+
+  senderEvb.terminateLoopSoon();
+  senderThread.join();
+
+  LOG(INFO) << "Bytes written: " << wcb.bytesWritten;
+
+  ASSERT_EQ(STATE_FAILED, wcb.state);
+  ASSERT_GE(wcb.bytesWritten, bytesRead);
+  ASSERT_LE(wcb.bytesWritten, sendSize);
+  ASSERT_EQ(recvSize, bytesRead);
+  ASSERT(32 * 1024 == wcb.bytesWritten || 40 * 1024 == wcb.bytesWritten);
+}
+
 /**
  * Test writing a mix of simple buffers and IOBufs
  */