APIs to determine which end of the socket has closed it
authorMohammad Husain <mhusain@fb.com>
Wed, 30 Sep 2015 21:32:33 +0000 (14:32 -0700)
committerfacebook-github-bot-1 <folly-bot@fb.com>
Wed, 30 Sep 2015 22:20:20 +0000 (15:20 -0700)
Reviewed By: @afrind

Differential Revision: D2466921

folly/io/async/AsyncSocket.cpp
folly/io/async/AsyncSocket.h
folly/io/async/AsyncTransport.h
folly/io/async/test/AsyncSocketTest2.cpp

index a603e709f67b455d4f8d6f3d93e1a5e35d6dd560..6adeb940196546a95ec5b91ac6a9b3b1a29efa54 100644 (file)
@@ -1347,11 +1347,13 @@ void AsyncSocket::handleRead() noexcept {
         // No more data to read right now.
         return;
     } else if (bytesRead == READ_ERROR) {
+      readErr_ = READ_ERROR;
       AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
                              withAddr("recv() failed"), errno);
       return failRead(__func__, ex);
     } else {
       assert(bytesRead == READ_EOF);
+      readErr_ = READ_EOF;
       // EOF
       shutdownFlags_ |= SHUT_READ;
       if (!updateEventRegistration(0, EventHandler::READ)) {
index 17e87796f0cbefaa7e79a3ae2dc7022424821827..4fd5b625abaae7e28136e79d638ef57c4f534a5b 100644 (file)
@@ -369,6 +369,16 @@ class AsyncSocket : virtual public AsyncTransportWrapper {
     return (state_ == StateEnum::CONNECTING);
   }
 
+  virtual bool isClosedByPeer() const {
+    return (state_ == StateEnum::CLOSED &&
+            (readErr_ == READ_EOF || readErr_ == READ_ERROR));
+  }
+
+  virtual bool isClosedBySelf() const {
+    return (state_ == StateEnum::CLOSED &&
+            (readErr_ != READ_EOF && readErr_ != READ_ERROR));
+  }
+
   size_t getAppBytesWritten() const override {
     return appBytesWritten_;
   }
@@ -546,6 +556,7 @@ class AsyncSocket : virtual public AsyncTransportWrapper {
     READ_EOF = 0,
     READ_ERROR = -1,
     READ_BLOCKING = -2,
+    READ_NO_ERROR = -3,
   };
 
   /**
@@ -770,6 +781,8 @@ class AsyncSocket : virtual public AsyncTransportWrapper {
   bool isBufferMovable_{false};
 
   bool peek_{false}; // Peek bytes.
+
+  int8_t readErr_{READ_NO_ERROR};      ///< The read error encountered, if any.
 };
 
 
index e4a6c29b32b997ebb4236414ec0315ca77f0b7b9..3086958b7ab9178e75924c0b521ee32475591cc0 100644 (file)
@@ -226,6 +226,7 @@ class AsyncTransport : public DelayedDestruction, public AsyncSocketBase {
   virtual bool isPending() const {
     return readable();
   }
+
   /**
    * Determine if transport is connected to the endpoint
    *
index 66edf663516d8cf2ba1b570fe0a7776d6594e1f7..2eae4a155259aee8a6e2d6adc94810f81d8c8e2e 100644 (file)
@@ -183,6 +183,9 @@ TEST(AsyncSocketTest, ConnectAndWrite) {
   // Make sure the server got a connection and received the data
   socket->close();
   server.verifyConnection(buf, sizeof(buf));
+
+  ASSERT_TRUE(socket->isClosedBySelf());
+  ASSERT_FALSE(socket->isClosedByPeer());
 }
 
 /**
@@ -210,6 +213,9 @@ TEST(AsyncSocketTest, ConnectNullCallback) {
   // Make sure the server got a connection and received the data
   socket->close();
   server.verifyConnection(buf, sizeof(buf));
+
+  ASSERT_TRUE(socket->isClosedBySelf());
+  ASSERT_FALSE(socket->isClosedByPeer());
 }
 
 /**
@@ -245,6 +251,9 @@ TEST(AsyncSocketTest, ConnectWriteAndClose) {
 
   // Make sure the server got a connection and received the data
   server.verifyConnection(buf, sizeof(buf));
+
+  ASSERT_TRUE(socket->isClosedBySelf());
+  ASSERT_FALSE(socket->isClosedByPeer());
 }
 
 /**
@@ -274,6 +283,9 @@ TEST(AsyncSocketTest, ConnectAndClose) {
 
   // Make sure the connection was aborted
   CHECK_EQ(ccb.state, STATE_FAILED);
+
+  ASSERT_TRUE(socket->isClosedBySelf());
+  ASSERT_FALSE(socket->isClosedByPeer());
 }
 
 /**
@@ -305,6 +317,9 @@ TEST(AsyncSocketTest, ConnectAndCloseNow) {
 
   // Make sure the connection was aborted
   CHECK_EQ(ccb.state, STATE_FAILED);
+
+  ASSERT_TRUE(socket->isClosedBySelf());
+  ASSERT_FALSE(socket->isClosedByPeer());
 }
 
 /**
@@ -344,6 +359,9 @@ TEST(AsyncSocketTest, ConnectWriteAndCloseNow) {
 
   CHECK_EQ(ccb.state, STATE_FAILED);
   CHECK_EQ(wcb.state, STATE_FAILED);
+
+  ASSERT_TRUE(socket->isClosedBySelf());
+  ASSERT_FALSE(socket->isClosedByPeer());
 }
 
 /**
@@ -378,6 +396,9 @@ TEST(AsyncSocketTest, ConnectAndRead) {
   CHECK_EQ(rcb.buffers.size(), 1);
   CHECK_EQ(rcb.buffers[0].length, sizeof(buf));
   CHECK_EQ(memcmp(rcb.buffers[0].buffer, buf, sizeof(buf)), 0);
+
+  ASSERT_FALSE(socket->isClosedBySelf());
+  ASSERT_FALSE(socket->isClosedByPeer());
 }
 
 /**
@@ -413,6 +434,9 @@ TEST(AsyncSocketTest, ConnectReadAndClose) {
   CHECK_EQ(ccb.state, STATE_FAILED); // we aborted the close attempt
   CHECK_EQ(rcb.buffers.size(), 0);
   CHECK_EQ(rcb.state, STATE_SUCCEEDED); // this indicates EOF
+
+  ASSERT_TRUE(socket->isClosedBySelf());
+  ASSERT_FALSE(socket->isClosedByPeer());
 }
 
 /**
@@ -471,6 +495,9 @@ TEST(AsyncSocketTest, ConnectWriteAndRead) {
   CHECK_EQ(memcmp(buf1, readbuf, sizeof(buf1)), 0);
   uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
   CHECK_EQ(bytesRead, 0);
+
+  ASSERT_FALSE(socket->isClosedBySelf());
+  ASSERT_TRUE(socket->isClosedByPeer());
 }
 
 /**
@@ -556,6 +583,9 @@ TEST(AsyncSocketTest, ConnectWriteAndShutdownWrite) {
   CHECK_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf));
   CHECK_EQ(memcmp(rcb.buffers[0].buffer,
                            acceptedWbuf, sizeof(acceptedWbuf)), 0);
+
+  ASSERT_FALSE(socket->isClosedBySelf());
+  ASSERT_FALSE(socket->isClosedByPeer());
 }
 
 /**
@@ -641,6 +671,9 @@ TEST(AsyncSocketTest, ConnectReadWriteAndShutdownWrite) {
   // Fully close both sockets
   acceptedSocket->close();
   socket->close();
+
+  ASSERT_FALSE(socket->isClosedBySelf());
+  ASSERT_TRUE(socket->isClosedByPeer());
 }
 
 /**
@@ -729,6 +762,9 @@ TEST(AsyncSocketTest, ConnectReadWriteAndShutdownWriteNow) {
   // Fully close both sockets
   acceptedSocket->close();
   socket->close();
+
+  ASSERT_FALSE(socket->isClosedBySelf());
+  ASSERT_TRUE(socket->isClosedByPeer());
 }
 
 // Helper function for use in testConnectOptWrite()
@@ -902,6 +938,9 @@ TEST(AsyncSocketTest, WriteNullCallback) {
   // Make sure the server got a connection and received the data
   socket->close();
   server.verifyConnection(buf, sizeof(buf));
+
+  ASSERT_TRUE(socket->isClosedBySelf());
+  ASSERT_FALSE(socket->isClosedByPeer());
 }
 
 /**
@@ -988,6 +1027,9 @@ TEST(AsyncSocketTest, WritePipeError) {
   CHECK_EQ(wcb.state, STATE_FAILED);
   CHECK_EQ(wcb.exception.getType(),
                     AsyncSocketException::INTERNAL_ERROR);
+
+  ASSERT_FALSE(socket->isClosedBySelf());
+  ASSERT_FALSE(socket->isClosedByPeer());
 }
 
 /**
@@ -1062,6 +1104,9 @@ TEST(AsyncSocketTest, WriteIOBuf) {
 
   acceptedSocket->close();
   socket->close();
+
+  ASSERT_TRUE(socket->isClosedBySelf());
+  ASSERT_FALSE(socket->isClosedByPeer());
 }
 
 TEST(AsyncSocketTest, WriteIOBufCorked) {
@@ -1120,6 +1165,9 @@ TEST(AsyncSocketTest, WriteIOBufCorked) {
 
   acceptedSocket->close();
   socket->close();
+
+  ASSERT_TRUE(socket->isClosedBySelf());
+  ASSERT_FALSE(socket->isClosedByPeer());
 }
 
 /**
@@ -1161,6 +1209,9 @@ TEST(AsyncSocketTest, ZeroLengthWrite) {
   CHECK_EQ(wcb3.state, STATE_SUCCEEDED);
   CHECK_EQ(wcb4.state, STATE_SUCCEEDED);
   rcb.verifyData(buf.get(), len1 + len2);
+
+  ASSERT_TRUE(socket->isClosedBySelf());
+  ASSERT_FALSE(socket->isClosedByPeer());
 }
 
 TEST(AsyncSocketTest, ZeroLengthWritev) {
@@ -1200,6 +1251,9 @@ TEST(AsyncSocketTest, ZeroLengthWritev) {
 
   CHECK_EQ(wcb.state, STATE_SUCCEEDED);
   rcb.verifyData(buf.get(), len1 + len2);
+
+  ASSERT_TRUE(socket->isClosedBySelf());
+  ASSERT_FALSE(socket->isClosedByPeer());
 }
 
 ///////////////////////////////////////////////////////////////////////////
@@ -1258,6 +1312,9 @@ TEST(AsyncSocketTest, ClosePendingWritesWhileClosing) {
        ++it) {
     CHECK_EQ((*it)->state, STATE_FAILED);
   }
+
+  ASSERT_TRUE(socket->isClosedBySelf());
+  ASSERT_FALSE(socket->isClosedByPeer());
 }
 
 ///////////////////////////////////////////////////////////////////////////
@@ -1317,6 +1374,9 @@ TEST(AsyncSocket, ConnectReadImmediateRead) {
   CHECK_EQ(wcb1.state, STATE_SUCCEEDED);
   rcb.verifyData(expectedData, expectedDataSz);
   CHECK_EQ(socket.immediateReadCalled, true);
+
+  ASSERT_FALSE(socket.isClosedBySelf());
+  ASSERT_FALSE(socket.isClosedByPeer());
 }
 
 TEST(AsyncSocket, ConnectReadUninstallRead) {
@@ -1368,6 +1428,9 @@ TEST(AsyncSocket, ConnectReadUninstallRead) {
    * was reset in dataAvailableCallback */
   CHECK_EQ(rcb.dataRead(), maxBufferSz);
   CHECK_EQ(socket.immediateReadCalled, false);
+
+  ASSERT_FALSE(socket.isClosedBySelf());
+  ASSERT_FALSE(socket.isClosedByPeer());
 }
 
 // TODO: