From: Mohammad Husain Date: Wed, 30 Sep 2015 21:32:33 +0000 (-0700) Subject: APIs to determine which end of the socket has closed it X-Git-Tag: deprecate-dynamic-initializer~366 X-Git-Url: http://demsky.eecs.uci.edu/git/?a=commitdiff_plain;h=0bf69167f1e7f1fd63e541e9c5ad12b8a759ee62;p=folly.git APIs to determine which end of the socket has closed it Reviewed By: @afrind Differential Revision: D2466921 --- diff --git a/folly/io/async/AsyncSocket.cpp b/folly/io/async/AsyncSocket.cpp index a603e709..6adeb940 100644 --- a/folly/io/async/AsyncSocket.cpp +++ b/folly/io/async/AsyncSocket.cpp @@ -1347,11 +1347,13 @@ void AsyncSocket::handleRead() noexcept { // No more data to read right now. return; } else if (bytesRead == READ_ERROR) { + readErr_ = READ_ERROR; AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR, withAddr("recv() failed"), errno); return failRead(__func__, ex); } else { assert(bytesRead == READ_EOF); + readErr_ = READ_EOF; // EOF shutdownFlags_ |= SHUT_READ; if (!updateEventRegistration(0, EventHandler::READ)) { diff --git a/folly/io/async/AsyncSocket.h b/folly/io/async/AsyncSocket.h index 17e87796..4fd5b625 100644 --- a/folly/io/async/AsyncSocket.h +++ b/folly/io/async/AsyncSocket.h @@ -369,6 +369,16 @@ class AsyncSocket : virtual public AsyncTransportWrapper { return (state_ == StateEnum::CONNECTING); } + virtual bool isClosedByPeer() const { + return (state_ == StateEnum::CLOSED && + (readErr_ == READ_EOF || readErr_ == READ_ERROR)); + } + + virtual bool isClosedBySelf() const { + return (state_ == StateEnum::CLOSED && + (readErr_ != READ_EOF && readErr_ != READ_ERROR)); + } + size_t getAppBytesWritten() const override { return appBytesWritten_; } @@ -546,6 +556,7 @@ class AsyncSocket : virtual public AsyncTransportWrapper { READ_EOF = 0, READ_ERROR = -1, READ_BLOCKING = -2, + READ_NO_ERROR = -3, }; /** @@ -770,6 +781,8 @@ class AsyncSocket : virtual public AsyncTransportWrapper { bool isBufferMovable_{false}; bool peek_{false}; // Peek bytes. + + int8_t readErr_{READ_NO_ERROR}; ///< The read error encountered, if any. }; diff --git a/folly/io/async/AsyncTransport.h b/folly/io/async/AsyncTransport.h index e4a6c29b..3086958b 100644 --- a/folly/io/async/AsyncTransport.h +++ b/folly/io/async/AsyncTransport.h @@ -226,6 +226,7 @@ class AsyncTransport : public DelayedDestruction, public AsyncSocketBase { virtual bool isPending() const { return readable(); } + /** * Determine if transport is connected to the endpoint * diff --git a/folly/io/async/test/AsyncSocketTest2.cpp b/folly/io/async/test/AsyncSocketTest2.cpp index 66edf663..2eae4a15 100644 --- a/folly/io/async/test/AsyncSocketTest2.cpp +++ b/folly/io/async/test/AsyncSocketTest2.cpp @@ -183,6 +183,9 @@ TEST(AsyncSocketTest, ConnectAndWrite) { // Make sure the server got a connection and received the data socket->close(); server.verifyConnection(buf, sizeof(buf)); + + ASSERT_TRUE(socket->isClosedBySelf()); + ASSERT_FALSE(socket->isClosedByPeer()); } /** @@ -210,6 +213,9 @@ TEST(AsyncSocketTest, ConnectNullCallback) { // Make sure the server got a connection and received the data socket->close(); server.verifyConnection(buf, sizeof(buf)); + + ASSERT_TRUE(socket->isClosedBySelf()); + ASSERT_FALSE(socket->isClosedByPeer()); } /** @@ -245,6 +251,9 @@ TEST(AsyncSocketTest, ConnectWriteAndClose) { // Make sure the server got a connection and received the data server.verifyConnection(buf, sizeof(buf)); + + ASSERT_TRUE(socket->isClosedBySelf()); + ASSERT_FALSE(socket->isClosedByPeer()); } /** @@ -274,6 +283,9 @@ TEST(AsyncSocketTest, ConnectAndClose) { // Make sure the connection was aborted CHECK_EQ(ccb.state, STATE_FAILED); + + ASSERT_TRUE(socket->isClosedBySelf()); + ASSERT_FALSE(socket->isClosedByPeer()); } /** @@ -305,6 +317,9 @@ TEST(AsyncSocketTest, ConnectAndCloseNow) { // Make sure the connection was aborted CHECK_EQ(ccb.state, STATE_FAILED); + + ASSERT_TRUE(socket->isClosedBySelf()); + ASSERT_FALSE(socket->isClosedByPeer()); } /** @@ -344,6 +359,9 @@ TEST(AsyncSocketTest, ConnectWriteAndCloseNow) { CHECK_EQ(ccb.state, STATE_FAILED); CHECK_EQ(wcb.state, STATE_FAILED); + + ASSERT_TRUE(socket->isClosedBySelf()); + ASSERT_FALSE(socket->isClosedByPeer()); } /** @@ -378,6 +396,9 @@ TEST(AsyncSocketTest, ConnectAndRead) { CHECK_EQ(rcb.buffers.size(), 1); CHECK_EQ(rcb.buffers[0].length, sizeof(buf)); CHECK_EQ(memcmp(rcb.buffers[0].buffer, buf, sizeof(buf)), 0); + + ASSERT_FALSE(socket->isClosedBySelf()); + ASSERT_FALSE(socket->isClosedByPeer()); } /** @@ -413,6 +434,9 @@ TEST(AsyncSocketTest, ConnectReadAndClose) { CHECK_EQ(ccb.state, STATE_FAILED); // we aborted the close attempt CHECK_EQ(rcb.buffers.size(), 0); CHECK_EQ(rcb.state, STATE_SUCCEEDED); // this indicates EOF + + ASSERT_TRUE(socket->isClosedBySelf()); + ASSERT_FALSE(socket->isClosedByPeer()); } /** @@ -471,6 +495,9 @@ TEST(AsyncSocketTest, ConnectWriteAndRead) { CHECK_EQ(memcmp(buf1, readbuf, sizeof(buf1)), 0); uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf)); CHECK_EQ(bytesRead, 0); + + ASSERT_FALSE(socket->isClosedBySelf()); + ASSERT_TRUE(socket->isClosedByPeer()); } /** @@ -556,6 +583,9 @@ TEST(AsyncSocketTest, ConnectWriteAndShutdownWrite) { CHECK_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf)); CHECK_EQ(memcmp(rcb.buffers[0].buffer, acceptedWbuf, sizeof(acceptedWbuf)), 0); + + ASSERT_FALSE(socket->isClosedBySelf()); + ASSERT_FALSE(socket->isClosedByPeer()); } /** @@ -641,6 +671,9 @@ TEST(AsyncSocketTest, ConnectReadWriteAndShutdownWrite) { // Fully close both sockets acceptedSocket->close(); socket->close(); + + ASSERT_FALSE(socket->isClosedBySelf()); + ASSERT_TRUE(socket->isClosedByPeer()); } /** @@ -729,6 +762,9 @@ TEST(AsyncSocketTest, ConnectReadWriteAndShutdownWriteNow) { // Fully close both sockets acceptedSocket->close(); socket->close(); + + ASSERT_FALSE(socket->isClosedBySelf()); + ASSERT_TRUE(socket->isClosedByPeer()); } // Helper function for use in testConnectOptWrite() @@ -902,6 +938,9 @@ TEST(AsyncSocketTest, WriteNullCallback) { // Make sure the server got a connection and received the data socket->close(); server.verifyConnection(buf, sizeof(buf)); + + ASSERT_TRUE(socket->isClosedBySelf()); + ASSERT_FALSE(socket->isClosedByPeer()); } /** @@ -988,6 +1027,9 @@ TEST(AsyncSocketTest, WritePipeError) { CHECK_EQ(wcb.state, STATE_FAILED); CHECK_EQ(wcb.exception.getType(), AsyncSocketException::INTERNAL_ERROR); + + ASSERT_FALSE(socket->isClosedBySelf()); + ASSERT_FALSE(socket->isClosedByPeer()); } /** @@ -1062,6 +1104,9 @@ TEST(AsyncSocketTest, WriteIOBuf) { acceptedSocket->close(); socket->close(); + + ASSERT_TRUE(socket->isClosedBySelf()); + ASSERT_FALSE(socket->isClosedByPeer()); } TEST(AsyncSocketTest, WriteIOBufCorked) { @@ -1120,6 +1165,9 @@ TEST(AsyncSocketTest, WriteIOBufCorked) { acceptedSocket->close(); socket->close(); + + ASSERT_TRUE(socket->isClosedBySelf()); + ASSERT_FALSE(socket->isClosedByPeer()); } /** @@ -1161,6 +1209,9 @@ TEST(AsyncSocketTest, ZeroLengthWrite) { CHECK_EQ(wcb3.state, STATE_SUCCEEDED); CHECK_EQ(wcb4.state, STATE_SUCCEEDED); rcb.verifyData(buf.get(), len1 + len2); + + ASSERT_TRUE(socket->isClosedBySelf()); + ASSERT_FALSE(socket->isClosedByPeer()); } TEST(AsyncSocketTest, ZeroLengthWritev) { @@ -1200,6 +1251,9 @@ TEST(AsyncSocketTest, ZeroLengthWritev) { CHECK_EQ(wcb.state, STATE_SUCCEEDED); rcb.verifyData(buf.get(), len1 + len2); + + ASSERT_TRUE(socket->isClosedBySelf()); + ASSERT_FALSE(socket->isClosedByPeer()); } /////////////////////////////////////////////////////////////////////////// @@ -1258,6 +1312,9 @@ TEST(AsyncSocketTest, ClosePendingWritesWhileClosing) { ++it) { CHECK_EQ((*it)->state, STATE_FAILED); } + + ASSERT_TRUE(socket->isClosedBySelf()); + ASSERT_FALSE(socket->isClosedByPeer()); } /////////////////////////////////////////////////////////////////////////// @@ -1317,6 +1374,9 @@ TEST(AsyncSocket, ConnectReadImmediateRead) { CHECK_EQ(wcb1.state, STATE_SUCCEEDED); rcb.verifyData(expectedData, expectedDataSz); CHECK_EQ(socket.immediateReadCalled, true); + + ASSERT_FALSE(socket.isClosedBySelf()); + ASSERT_FALSE(socket.isClosedByPeer()); } TEST(AsyncSocket, ConnectReadUninstallRead) { @@ -1368,6 +1428,9 @@ TEST(AsyncSocket, ConnectReadUninstallRead) { * was reset in dataAvailableCallback */ CHECK_EQ(rcb.dataRead(), maxBufferSz); CHECK_EQ(socket.immediateReadCalled, false); + + ASSERT_FALSE(socket.isClosedBySelf()); + ASSERT_FALSE(socket.isClosedByPeer()); } // TODO: