2 * Copyright 2010-present Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #include <folly/io/async/test/AsyncSocketTest2.h>
19 #include <folly/ConstexprMath.h>
20 #include <folly/ExceptionWrapper.h>
21 #include <folly/Random.h>
22 #include <folly/SocketAddress.h>
23 #include <folly/io/async/AsyncSocket.h>
24 #include <folly/io/async/AsyncTimeout.h>
25 #include <folly/io/async/EventBase.h>
27 #include <folly/experimental/TestUtil.h>
28 #include <folly/io/IOBuf.h>
29 #include <folly/io/async/test/AsyncSocketTest.h>
30 #include <folly/io/async/test/Util.h>
31 #include <folly/portability/GMock.h>
32 #include <folly/portability/GTest.h>
33 #include <folly/portability/Sockets.h>
34 #include <folly/portability/Unistd.h>
35 #include <folly/test/SocketAddressTestHelper.h>
37 #include <boost/scoped_array.hpp>
39 #include <sys/types.h>
43 using namespace boost;
50 using std::unique_ptr;
51 using std::chrono::milliseconds;
52 using boost::scoped_array;
54 using namespace folly;
55 using namespace folly::test;
56 using namespace testing;
58 namespace fsp = folly::portability::sockets;
60 class DelayedWrite: public AsyncTimeout {
62 DelayedWrite(const std::shared_ptr<AsyncSocket>& socket,
63 unique_ptr<IOBuf>&& bufs, AsyncTransportWrapper::WriteCallback* wcb,
64 bool cork, bool lastWrite = false):
65 AsyncTimeout(socket->getEventBase()),
67 bufs_(std::move(bufs)),
70 lastWrite_(lastWrite) {}
73 void timeoutExpired() noexcept override {
74 WriteFlags flags = cork_ ? WriteFlags::CORK : WriteFlags::NONE;
75 socket_->writeChain(wcb_, std::move(bufs_), flags);
77 socket_->shutdownWrite();
81 std::shared_ptr<AsyncSocket> socket_;
82 unique_ptr<IOBuf> bufs_;
83 AsyncTransportWrapper::WriteCallback* wcb_;
88 ///////////////////////////////////////////////////////////////////////////
90 ///////////////////////////////////////////////////////////////////////////
93 * Test connecting to a server
95 TEST(AsyncSocketTest, Connect) {
96 // Start listening on a local port
99 // Connect using a AsyncSocket
101 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
103 socket->connect(&cb, server.getAddress(), 30);
107 ASSERT_EQ(cb.state, STATE_SUCCEEDED);
108 EXPECT_LE(0, socket->getConnectTime().count());
109 EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30));
112 enum class TFOState {
117 class AsyncSocketConnectTest : public ::testing::TestWithParam<TFOState> {};
119 std::vector<TFOState> getTestingValues() {
120 std::vector<TFOState> vals;
121 vals.emplace_back(TFOState::DISABLED);
124 vals.emplace_back(TFOState::ENABLED);
129 INSTANTIATE_TEST_CASE_P(
131 AsyncSocketConnectTest,
132 ::testing::ValuesIn(getTestingValues()));
135 * Test connecting to a server that isn't listening
137 TEST(AsyncSocketTest, ConnectRefused) {
140 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
142 // Hopefully nothing is actually listening on this address
143 folly::SocketAddress addr("127.0.0.1", 65535);
145 socket->connect(&cb, addr, 30);
149 EXPECT_EQ(STATE_FAILED, cb.state);
150 EXPECT_EQ(AsyncSocketException::NOT_OPEN, cb.exception.getType());
151 EXPECT_LE(0, socket->getConnectTime().count());
152 EXPECT_EQ(std::chrono::milliseconds(30), socket->getConnectTimeout());
156 * Test connection timeout
158 TEST(AsyncSocketTest, ConnectTimeout) {
161 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
163 // Try connecting to server that won't respond.
165 // This depends somewhat on the network where this test is run.
166 // Hopefully this IP will be routable but unresponsive.
167 // (Alternatively, we could try listening on a local raw socket, but that
168 // normally requires root privileges.)
170 SocketAddressTestHelper::isIPv6Enabled() ?
171 SocketAddressTestHelper::kGooglePublicDnsAAddrIPv6 :
172 SocketAddressTestHelper::isIPv4Enabled() ?
173 SocketAddressTestHelper::kGooglePublicDnsAAddrIPv4 :
175 SocketAddress addr(host, 65535);
177 socket->connect(&cb, addr, 1); // also set a ridiculously small timeout
181 ASSERT_EQ(cb.state, STATE_FAILED);
182 ASSERT_EQ(cb.exception.getType(), AsyncSocketException::TIMED_OUT);
184 // Verify that we can still get the peer address after a timeout.
185 // Use case is if the client was created from a client pool, and we want
186 // to log which peer failed.
187 folly::SocketAddress peer;
188 socket->getPeerAddress(&peer);
189 ASSERT_EQ(peer, addr);
190 EXPECT_LE(0, socket->getConnectTime().count());
191 EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(1));
195 * Test writing immediately after connecting, without waiting for connect
198 TEST_P(AsyncSocketConnectTest, ConnectAndWrite) {
203 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
205 if (GetParam() == TFOState::ENABLED) {
210 socket->connect(&ccb, server.getAddress(), 30);
214 memset(buf, 'a', sizeof(buf));
216 socket->write(&wcb, buf, sizeof(buf));
218 // Loop. We don't bother accepting on the server socket yet.
219 // The kernel should be able to buffer the write request so it can succeed.
222 ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
223 ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
225 // Make sure the server got a connection and received the data
227 server.verifyConnection(buf, sizeof(buf));
229 ASSERT_TRUE(socket->isClosedBySelf());
230 ASSERT_FALSE(socket->isClosedByPeer());
231 EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30));
235 * Test connecting using a nullptr connect callback.
237 TEST_P(AsyncSocketConnectTest, ConnectNullCallback) {
242 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
243 if (GetParam() == TFOState::ENABLED) {
247 socket->connect(nullptr, server.getAddress(), 30);
249 // write some data, just so we have some way of verifing
250 // that the socket works correctly after connecting
252 memset(buf, 'a', sizeof(buf));
254 socket->write(&wcb, buf, sizeof(buf));
258 ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
260 // Make sure the server got a connection and received the data
262 server.verifyConnection(buf, sizeof(buf));
264 ASSERT_TRUE(socket->isClosedBySelf());
265 ASSERT_FALSE(socket->isClosedByPeer());
269 * Test calling both write() and close() immediately after connecting, without
270 * waiting for connect to finish.
272 * This exercises the STATE_CONNECTING_CLOSING code.
274 TEST_P(AsyncSocketConnectTest, ConnectWriteAndClose) {
279 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
280 if (GetParam() == TFOState::ENABLED) {
284 socket->connect(&ccb, server.getAddress(), 30);
288 memset(buf, 'a', sizeof(buf));
290 socket->write(&wcb, buf, sizeof(buf));
295 // Loop. We don't bother accepting on the server socket yet.
296 // The kernel should be able to buffer the write request so it can succeed.
299 ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
300 ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
302 // Make sure the server got a connection and received the data
303 server.verifyConnection(buf, sizeof(buf));
305 ASSERT_TRUE(socket->isClosedBySelf());
306 ASSERT_FALSE(socket->isClosedByPeer());
310 * Test calling close() immediately after connect()
312 TEST(AsyncSocketTest, ConnectAndClose) {
315 // Connect using a AsyncSocket
317 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
319 socket->connect(&ccb, server.getAddress(), 30);
321 // Hopefully the connect didn't succeed immediately.
322 // If it did, we can't exercise the close-while-connecting code path.
323 if (ccb.state == STATE_SUCCEEDED) {
324 LOG(INFO) << "connect() succeeded immediately; aborting test "
325 "of close-during-connect behavior";
331 // Loop, although there shouldn't be anything to do.
334 // Make sure the connection was aborted
335 ASSERT_EQ(ccb.state, STATE_FAILED);
337 ASSERT_TRUE(socket->isClosedBySelf());
338 ASSERT_FALSE(socket->isClosedByPeer());
342 * Test calling closeNow() immediately after connect()
344 * This should be identical to the normal close behavior.
346 TEST(AsyncSocketTest, ConnectAndCloseNow) {
349 // Connect using a AsyncSocket
351 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
353 socket->connect(&ccb, server.getAddress(), 30);
355 // Hopefully the connect didn't succeed immediately.
356 // If it did, we can't exercise the close-while-connecting code path.
357 if (ccb.state == STATE_SUCCEEDED) {
358 LOG(INFO) << "connect() succeeded immediately; aborting test "
359 "of closeNow()-during-connect behavior";
365 // Loop, although there shouldn't be anything to do.
368 // Make sure the connection was aborted
369 ASSERT_EQ(ccb.state, STATE_FAILED);
371 ASSERT_TRUE(socket->isClosedBySelf());
372 ASSERT_FALSE(socket->isClosedByPeer());
376 * Test calling both write() and closeNow() immediately after connecting,
377 * without waiting for connect to finish.
379 * This should abort the pending write.
381 TEST(AsyncSocketTest, ConnectWriteAndCloseNow) {
386 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
388 socket->connect(&ccb, server.getAddress(), 30);
390 // Hopefully the connect didn't succeed immediately.
391 // If it did, we can't exercise the close-while-connecting code path.
392 if (ccb.state == STATE_SUCCEEDED) {
393 LOG(INFO) << "connect() succeeded immediately; aborting test "
394 "of write-during-connect behavior";
400 memset(buf, 'a', sizeof(buf));
402 socket->write(&wcb, buf, sizeof(buf));
407 // Loop, although there shouldn't be anything to do.
410 ASSERT_EQ(ccb.state, STATE_FAILED);
411 ASSERT_EQ(wcb.state, STATE_FAILED);
413 ASSERT_TRUE(socket->isClosedBySelf());
414 ASSERT_FALSE(socket->isClosedByPeer());
418 * Test installing a read callback immediately, before connect() finishes.
420 TEST_P(AsyncSocketConnectTest, ConnectAndRead) {
425 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
426 if (GetParam() == TFOState::ENABLED) {
431 socket->connect(&ccb, server.getAddress(), 30);
434 socket->setReadCB(&rcb);
436 if (GetParam() == TFOState::ENABLED) {
437 // Trigger a connection
438 socket->writeChain(nullptr, IOBuf::copyBuffer("hey"));
441 // Even though we haven't looped yet, we should be able to accept
442 // the connection and send data to it.
443 std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
445 memset(buf, 'a', sizeof(buf));
446 acceptedSocket->write(buf, sizeof(buf));
447 acceptedSocket->flush();
448 acceptedSocket->close();
450 // Loop, although there shouldn't be anything to do.
453 ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
454 ASSERT_EQ(rcb.buffers.size(), 1);
455 ASSERT_EQ(rcb.buffers[0].length, sizeof(buf));
456 ASSERT_EQ(memcmp(rcb.buffers[0].buffer, buf, sizeof(buf)), 0);
458 ASSERT_FALSE(socket->isClosedBySelf());
459 ASSERT_FALSE(socket->isClosedByPeer());
463 * Test installing a read callback and then closing immediately before the
464 * connect attempt finishes.
466 TEST(AsyncSocketTest, ConnectReadAndClose) {
471 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
473 socket->connect(&ccb, server.getAddress(), 30);
475 // Hopefully the connect didn't succeed immediately.
476 // If it did, we can't exercise the close-while-connecting code path.
477 if (ccb.state == STATE_SUCCEEDED) {
478 LOG(INFO) << "connect() succeeded immediately; aborting test "
479 "of read-during-connect behavior";
484 socket->setReadCB(&rcb);
489 // Loop, although there shouldn't be anything to do.
492 ASSERT_EQ(ccb.state, STATE_FAILED); // we aborted the close attempt
493 ASSERT_EQ(rcb.buffers.size(), 0);
494 ASSERT_EQ(rcb.state, STATE_SUCCEEDED); // this indicates EOF
496 ASSERT_TRUE(socket->isClosedBySelf());
497 ASSERT_FALSE(socket->isClosedByPeer());
501 * Test both writing and installing a read callback immediately,
502 * before connect() finishes.
504 TEST_P(AsyncSocketConnectTest, ConnectWriteAndRead) {
509 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
510 if (GetParam() == TFOState::ENABLED) {
514 socket->connect(&ccb, server.getAddress(), 30);
518 memset(buf1, 'a', sizeof(buf1));
520 socket->write(&wcb, buf1, sizeof(buf1));
522 // set a read callback
524 socket->setReadCB(&rcb);
526 // Even though we haven't looped yet, we should be able to accept
527 // the connection and send data to it.
528 std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
530 memset(buf2, 'b', sizeof(buf2));
531 acceptedSocket->write(buf2, sizeof(buf2));
532 acceptedSocket->flush();
534 // shut down the write half of acceptedSocket, so that the AsyncSocket
535 // will stop reading and we can break out of the event loop.
536 shutdown(acceptedSocket->getSocketFD(), SHUT_WR);
541 // Make sure the connect succeeded
542 ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
544 // Make sure the AsyncSocket read the data written by the accepted socket
545 ASSERT_EQ(rcb.state, STATE_SUCCEEDED);
546 ASSERT_EQ(rcb.buffers.size(), 1);
547 ASSERT_EQ(rcb.buffers[0].length, sizeof(buf2));
548 ASSERT_EQ(memcmp(rcb.buffers[0].buffer, buf2, sizeof(buf2)), 0);
550 // Close the AsyncSocket so we'll see EOF on acceptedSocket
553 // Make sure the accepted socket saw the data written by the AsyncSocket
554 uint8_t readbuf[sizeof(buf1)];
555 acceptedSocket->readAll(readbuf, sizeof(readbuf));
556 ASSERT_EQ(memcmp(buf1, readbuf, sizeof(buf1)), 0);
557 uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
558 ASSERT_EQ(bytesRead, 0);
560 ASSERT_FALSE(socket->isClosedBySelf());
561 ASSERT_TRUE(socket->isClosedByPeer());
565 * Test writing to the socket then shutting down writes before the connect
568 TEST(AsyncSocketTest, ConnectWriteAndShutdownWrite) {
573 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
575 socket->connect(&ccb, server.getAddress(), 30);
577 // Hopefully the connect didn't succeed immediately.
578 // If it did, we can't exercise the write-while-connecting code path.
579 if (ccb.state == STATE_SUCCEEDED) {
580 LOG(INFO) << "connect() succeeded immediately; skipping test";
584 // Ask to write some data
586 memset(wbuf, 'a', sizeof(wbuf));
588 socket->write(&wcb, wbuf, sizeof(wbuf));
589 socket->shutdownWrite();
592 socket->shutdownWrite();
594 // Even though we haven't looped yet, we should be able to accept
596 std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
598 // Since the connection is still in progress, there should be no data to
599 // read yet. Verify that the accepted socket is not readable.
600 struct pollfd fds[1];
601 fds[0].fd = acceptedSocket->getSocketFD();
602 fds[0].events = POLLIN;
604 int rc = poll(fds, 1, 0);
607 // Write data to the accepted socket
608 uint8_t acceptedWbuf[192];
609 memset(acceptedWbuf, 'b', sizeof(acceptedWbuf));
610 acceptedSocket->write(acceptedWbuf, sizeof(acceptedWbuf));
611 acceptedSocket->flush();
616 // The loop should have completed the connection, written the queued data,
617 // and shutdown writes on the socket.
619 // Check that the connection was completed successfully and that the write
620 // callback succeeded.
621 ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
622 ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
624 // Check that we can read the data that was written to the socket, and that
625 // we see an EOF, since its socket was half-shutdown.
626 uint8_t readbuf[sizeof(wbuf)];
627 acceptedSocket->readAll(readbuf, sizeof(readbuf));
628 ASSERT_EQ(memcmp(wbuf, readbuf, sizeof(wbuf)), 0);
629 uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
630 ASSERT_EQ(bytesRead, 0);
632 // Close the accepted socket. This will cause it to see EOF
633 // and uninstall the read callback when we loop next.
634 acceptedSocket->close();
636 // Install a read callback, then loop again.
638 socket->setReadCB(&rcb);
641 // This loop should have read the data and seen the EOF
642 ASSERT_EQ(rcb.state, STATE_SUCCEEDED);
643 ASSERT_EQ(rcb.buffers.size(), 1);
644 ASSERT_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf));
645 ASSERT_EQ(memcmp(rcb.buffers[0].buffer,
646 acceptedWbuf, sizeof(acceptedWbuf)), 0);
648 ASSERT_FALSE(socket->isClosedBySelf());
649 ASSERT_FALSE(socket->isClosedByPeer());
653 * Test reading, writing, and shutting down writes before the connect attempt
656 TEST(AsyncSocketTest, ConnectReadWriteAndShutdownWrite) {
661 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
663 socket->connect(&ccb, server.getAddress(), 30);
665 // Hopefully the connect didn't succeed immediately.
666 // If it did, we can't exercise the write-while-connecting code path.
667 if (ccb.state == STATE_SUCCEEDED) {
668 LOG(INFO) << "connect() succeeded immediately; skipping test";
672 // Install a read callback
674 socket->setReadCB(&rcb);
676 // Ask to write some data
678 memset(wbuf, 'a', sizeof(wbuf));
680 socket->write(&wcb, wbuf, sizeof(wbuf));
683 socket->shutdownWrite();
685 // Even though we haven't looped yet, we should be able to accept
687 std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
689 // Since the connection is still in progress, there should be no data to
690 // read yet. Verify that the accepted socket is not readable.
691 struct pollfd fds[1];
692 fds[0].fd = acceptedSocket->getSocketFD();
693 fds[0].events = POLLIN;
695 int rc = poll(fds, 1, 0);
698 // Write data to the accepted socket
699 uint8_t acceptedWbuf[192];
700 memset(acceptedWbuf, 'b', sizeof(acceptedWbuf));
701 acceptedSocket->write(acceptedWbuf, sizeof(acceptedWbuf));
702 acceptedSocket->flush();
703 // Shutdown writes to the accepted socket. This will cause it to see EOF
704 // and uninstall the read callback.
705 shutdown(acceptedSocket->getSocketFD(), SHUT_WR);
710 // The loop should have completed the connection, written the queued data,
711 // shutdown writes on the socket, read the data we wrote to it, and see the
714 // Check that the connection was completed successfully and that the read
715 // and write callbacks were invoked as expected.
716 ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
717 ASSERT_EQ(rcb.state, STATE_SUCCEEDED);
718 ASSERT_EQ(rcb.buffers.size(), 1);
719 ASSERT_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf));
720 ASSERT_EQ(memcmp(rcb.buffers[0].buffer,
721 acceptedWbuf, sizeof(acceptedWbuf)), 0);
722 ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
724 // Check that we can read the data that was written to the socket, and that
725 // we see an EOF, since its socket was half-shutdown.
726 uint8_t readbuf[sizeof(wbuf)];
727 acceptedSocket->readAll(readbuf, sizeof(readbuf));
728 ASSERT_EQ(memcmp(wbuf, readbuf, sizeof(wbuf)), 0);
729 uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
730 ASSERT_EQ(bytesRead, 0);
732 // Fully close both sockets
733 acceptedSocket->close();
736 ASSERT_FALSE(socket->isClosedBySelf());
737 ASSERT_TRUE(socket->isClosedByPeer());
741 * Test reading, writing, and calling shutdownWriteNow() before the
742 * connect attempt finishes.
744 TEST(AsyncSocketTest, ConnectReadWriteAndShutdownWriteNow) {
749 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
751 socket->connect(&ccb, server.getAddress(), 30);
753 // Hopefully the connect didn't succeed immediately.
754 // If it did, we can't exercise the write-while-connecting code path.
755 if (ccb.state == STATE_SUCCEEDED) {
756 LOG(INFO) << "connect() succeeded immediately; skipping test";
760 // Install a read callback
762 socket->setReadCB(&rcb);
764 // Ask to write some data
766 memset(wbuf, 'a', sizeof(wbuf));
768 socket->write(&wcb, wbuf, sizeof(wbuf));
770 // Shutdown writes immediately.
771 // This should immediately discard the data that we just tried to write.
772 socket->shutdownWriteNow();
774 // Verify that writeError() was invoked on the write callback.
775 ASSERT_EQ(wcb.state, STATE_FAILED);
776 ASSERT_EQ(wcb.bytesWritten, 0);
778 // Even though we haven't looped yet, we should be able to accept
780 std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
782 // Since the connection is still in progress, there should be no data to
783 // read yet. Verify that the accepted socket is not readable.
784 struct pollfd fds[1];
785 fds[0].fd = acceptedSocket->getSocketFD();
786 fds[0].events = POLLIN;
788 int rc = poll(fds, 1, 0);
791 // Write data to the accepted socket
792 uint8_t acceptedWbuf[192];
793 memset(acceptedWbuf, 'b', sizeof(acceptedWbuf));
794 acceptedSocket->write(acceptedWbuf, sizeof(acceptedWbuf));
795 acceptedSocket->flush();
796 // Shutdown writes to the accepted socket. This will cause it to see EOF
797 // and uninstall the read callback.
798 shutdown(acceptedSocket->getSocketFD(), SHUT_WR);
803 // The loop should have completed the connection, written the queued data,
804 // shutdown writes on the socket, read the data we wrote to it, and see the
807 // Check that the connection was completed successfully and that the read
808 // callback was invoked as expected.
809 ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
810 ASSERT_EQ(rcb.state, STATE_SUCCEEDED);
811 ASSERT_EQ(rcb.buffers.size(), 1);
812 ASSERT_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf));
813 ASSERT_EQ(memcmp(rcb.buffers[0].buffer,
814 acceptedWbuf, sizeof(acceptedWbuf)), 0);
816 // Since we used shutdownWriteNow(), it should have discarded all pending
817 // write data. Verify we see an immediate EOF when reading from the accepted
819 uint8_t readbuf[sizeof(wbuf)];
820 uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
821 ASSERT_EQ(bytesRead, 0);
823 // Fully close both sockets
824 acceptedSocket->close();
827 ASSERT_FALSE(socket->isClosedBySelf());
828 ASSERT_TRUE(socket->isClosedByPeer());
831 // Helper function for use in testConnectOptWrite()
832 // Temporarily disable the read callback
833 void tmpDisableReads(AsyncSocket* socket, ReadCallback* rcb) {
834 // Uninstall the read callback
835 socket->setReadCB(nullptr);
836 // Schedule the read callback to be reinstalled after 1ms
837 socket->getEventBase()->runInLoop(
838 std::bind(&AsyncSocket::setReadCB, socket, rcb));
842 * Test connect+write, then have the connect callback perform another write.
844 * This tests interaction of the optimistic writing after connect with
845 * additional write attempts that occur in the connect callback.
847 void testConnectOptWrite(size_t size1, size_t size2, bool close = false) {
850 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
854 socket->connect(&ccb, server.getAddress(), 30);
856 // Hopefully the connect didn't succeed immediately.
857 // If it did, we can't exercise the optimistic write code path.
858 if (ccb.state == STATE_SUCCEEDED) {
859 LOG(INFO) << "connect() succeeded immediately; aborting test "
860 "of optimistic write behavior";
864 // Tell the connect callback to perform a write when the connect succeeds
866 scoped_array<char> buf2(new char[size2]);
867 memset(buf2.get(), 'b', size2);
869 ccb.successCallback = [&] { socket->write(&wcb2, buf2.get(), size2); };
870 // Tell the second write callback to close the connection when it is done
871 wcb2.successCallback = [&] { socket->closeNow(); };
874 // Schedule one write() immediately, before the connect finishes
875 scoped_array<char> buf1(new char[size1]);
876 memset(buf1.get(), 'a', size1);
879 socket->write(&wcb1, buf1.get(), size1);
883 // immediately perform a close, before connect() completes
887 // Start reading from the other endpoint after 10ms.
888 // If we're using large buffers, we have to read so that the writes don't
890 std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&evb);
892 rcb.dataAvailableCallback = std::bind(tmpDisableReads,
893 acceptedSocket.get(), &rcb);
894 socket->getEventBase()->tryRunAfterDelay(
895 std::bind(&AsyncSocket::setReadCB, acceptedSocket.get(), &rcb),
898 // Loop. We don't bother accepting on the server socket yet.
899 // The kernel should be able to buffer the write request so it can succeed.
902 ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
904 ASSERT_EQ(wcb1.state, STATE_SUCCEEDED);
907 ASSERT_EQ(wcb2.state, STATE_SUCCEEDED);
912 // Make sure the read callback received all of the data
913 size_t bytesRead = 0;
914 for (vector<ReadCallback::Buffer>::const_iterator it = rcb.buffers.begin();
915 it != rcb.buffers.end();
917 size_t start = bytesRead;
918 bytesRead += it->length;
919 size_t end = bytesRead;
921 size_t cmpLen = min(size1, end) - start;
922 ASSERT_EQ(memcmp(it->buffer, buf1.get() + start, cmpLen), 0);
924 if (end > size1 && end <= size1 + size2) {
928 if (start >= size1) {
930 buf2Offset = start - size1;
931 cmpLen = end - start;
933 itOffset = size1 - start;
935 cmpLen = end - size1;
937 ASSERT_EQ(memcmp(it->buffer + itOffset, buf2.get() + buf2Offset,
942 ASSERT_EQ(bytesRead, size1 + size2);
945 TEST(AsyncSocketTest, ConnectCallbackWrite) {
946 // Test using small writes that should both succeed immediately
947 testConnectOptWrite(100, 200);
949 // Test using a large buffer in the connect callback, that should block
950 const size_t largeSize = 32 * 1024 * 1024;
951 testConnectOptWrite(100, largeSize);
953 // Test using a large initial write
954 testConnectOptWrite(largeSize, 100);
956 // Test using two large buffers
957 testConnectOptWrite(largeSize, largeSize);
959 // Test a small write in the connect callback,
960 // but no immediate write before connect completes
961 testConnectOptWrite(0, 64);
963 // Test a large write in the connect callback,
964 // but no immediate write before connect completes
965 testConnectOptWrite(0, largeSize);
967 // Test connect, a small write, then immediately call close() before connect
969 testConnectOptWrite(211, 0, true);
971 // Test connect, a large immediate write (that will block), then immediately
972 // call close() before connect completes
973 testConnectOptWrite(largeSize, 0, true);
976 ///////////////////////////////////////////////////////////////////////////
977 // write() related tests
978 ///////////////////////////////////////////////////////////////////////////
981 * Test writing using a nullptr callback
983 TEST(AsyncSocketTest, WriteNullCallback) {
988 std::shared_ptr<AsyncSocket> socket =
989 AsyncSocket::newSocket(&evb, server.getAddress(), 30);
990 evb.loop(); // loop until the socket is connected
992 // write() with a nullptr callback
994 memset(buf, 'a', sizeof(buf));
995 socket->write(nullptr, buf, sizeof(buf));
997 evb.loop(); // loop until the data is sent
999 // Make sure the server got a connection and received the data
1001 server.verifyConnection(buf, sizeof(buf));
1003 ASSERT_TRUE(socket->isClosedBySelf());
1004 ASSERT_FALSE(socket->isClosedByPeer());
1008 * Test writing with a send timeout
1010 TEST(AsyncSocketTest, WriteTimeout) {
1015 std::shared_ptr<AsyncSocket> socket =
1016 AsyncSocket::newSocket(&evb, server.getAddress(), 30);
1017 evb.loop(); // loop until the socket is connected
1019 // write() a large chunk of data, with no-one on the other end reading.
1020 // Tricky: the kernel caches the connection metrics for recently-used
1021 // routes (see tcp_no_metrics_save) so a freshly opened connection can
1022 // have a send buffer size bigger than wmem_default. This makes the test
1023 // flaky on contbuild if writeLength is < wmem_max (20M on our systems).
1024 size_t writeLength = 32 * 1024 * 1024;
1025 uint32_t timeout = 200;
1026 socket->setSendTimeout(timeout);
1027 scoped_array<char> buf(new char[writeLength]);
1028 memset(buf.get(), 'a', writeLength);
1030 socket->write(&wcb, buf.get(), writeLength);
1036 // Make sure the write attempt timed out as requested
1037 ASSERT_EQ(wcb.state, STATE_FAILED);
1038 ASSERT_EQ(wcb.exception.getType(), AsyncSocketException::TIMED_OUT);
1040 // Check that the write timed out within a reasonable period of time.
1041 // We don't check for exactly the specified timeout, since AsyncSocket only
1042 // times out when it hasn't made progress for that period of time.
1044 // On linux, the first write sends a few hundred kb of data, then blocks for
1045 // writability, and then unblocks again after 40ms and is able to write
1046 // another smaller of data before blocking permanently. Therefore it doesn't
1047 // time out until 40ms + timeout.
1049 // I haven't fully verified the cause of this, but I believe it probably
1050 // occurs because the receiving end delays sending an ack for up to 40ms.
1051 // (This is the default value for TCP_DELACK_MIN.) Once the sender receives
1052 // the ack, it can send some more data. However, after that point the
1053 // receiver's kernel buffer is full. This 40ms delay happens even with
1054 // TCP_NODELAY and TCP_QUICKACK enabled on both endpoints. However, the
1055 // kernel may be automatically disabling TCP_QUICKACK after receiving some
1058 // For now, we simply check that the timeout occurred within 160ms of
1059 // the requested value.
1060 T_CHECK_TIMEOUT(start, end, milliseconds(timeout), milliseconds(160));
1064 * Test writing to a socket that the remote endpoint has closed
1066 TEST(AsyncSocketTest, WritePipeError) {
1071 std::shared_ptr<AsyncSocket> socket =
1072 AsyncSocket::newSocket(&evb, server.getAddress(), 30);
1073 socket->setSendTimeout(1000);
1074 evb.loop(); // loop until the socket is connected
1076 // accept and immediately close the socket
1077 std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
1078 acceptedSocket->close();
1080 // write() a large chunk of data
1081 size_t writeLength = 32 * 1024 * 1024;
1082 scoped_array<char> buf(new char[writeLength]);
1083 memset(buf.get(), 'a', writeLength);
1085 socket->write(&wcb, buf.get(), writeLength);
1089 // Make sure the write failed.
1090 // It would be nice if AsyncSocketException could convey the errno value,
1091 // so that we could check for EPIPE
1092 ASSERT_EQ(wcb.state, STATE_FAILED);
1093 ASSERT_EQ(wcb.exception.getType(),
1094 AsyncSocketException::INTERNAL_ERROR);
1096 ASSERT_FALSE(socket->isClosedBySelf());
1097 ASSERT_FALSE(socket->isClosedByPeer());
1101 * Test writing to a socket that has its read side closed
1103 TEST(AsyncSocketTest, WriteAfterReadEOF) {
1108 std::shared_ptr<AsyncSocket> socket =
1109 AsyncSocket::newSocket(&evb, server.getAddress(), 30);
1110 evb.loop(); // loop until the socket is connected
1112 // Accept the connection
1113 std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&evb);
1115 acceptedSocket->setReadCB(&rcb);
1117 // Shutdown the write side of client socket (read side of server socket)
1118 socket->shutdownWrite();
1121 // Check that accepted socket is still writable
1122 ASSERT_FALSE(acceptedSocket->good());
1123 ASSERT_TRUE(acceptedSocket->writable());
1125 // Write data to accepted socket
1126 constexpr size_t simpleBufLength = 5;
1127 char simpleBuf[simpleBufLength];
1128 memset(simpleBuf, 'a', simpleBufLength);
1130 acceptedSocket->write(&wcb, simpleBuf, simpleBufLength);
1133 // Make sure we were able to write even after getting a read EOF
1134 ASSERT_EQ(rcb.state, STATE_SUCCEEDED); // this indicates EOF
1135 ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
1139 * Test that bytes written is correctly computed in case of write failure
1141 TEST(AsyncSocketTest, WriteErrorCallbackBytesWritten) {
1142 // Send and receive buffer sizes for the sockets.
1143 constexpr size_t kSockBufSize = 8 * 1024;
1145 TestServer server(false, kSockBufSize);
1147 AsyncSocket::OptionMap options{
1148 {{SOL_SOCKET, SO_SNDBUF}, kSockBufSize},
1149 {{SOL_SOCKET, SO_RCVBUF}, kSockBufSize},
1150 {{IPPROTO_TCP, TCP_NODELAY}, 1},
1153 // The current thread will be used by the receiver - use a separate thread
1155 EventBase senderEvb;
1156 std::thread senderThread([&]() { senderEvb.loopForever(); });
1159 std::shared_ptr<AsyncSocket> socket;
1161 senderEvb.runInEventBaseThreadAndWait([&]() {
1162 socket = AsyncSocket::newSocket(&senderEvb);
1163 socket->connect(&ccb, server.getAddress(), 30, options);
1166 // accept the socket on the server side
1167 std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
1169 // Send a big (45KB) write so that it is partially written. The first write
1170 // is 16KB (8KB on both sides) and subsequent writes are 8KB each. Reading
1171 // just under 24KB would cause 3-4 writes for the total of 32-40KB in the
1172 // following sequence: 16KB + 8KB + 8KB (+ 8KB). This ensures that not all
1173 // bytes are written when the socket is reset. Having at least 3 writes
1174 // ensures that the total size (45KB) would be exceeed in case of overcounting
1175 // based on the initial write size of 16KB.
1176 constexpr size_t kSendSize = 45 * 1024;
1177 auto const sendBuf = std::vector<char>(kSendSize, 'a');
1181 senderEvb.runInEventBaseThreadAndWait(
1182 [&]() { socket->write(&wcb, sendBuf.data(), kSendSize); });
1184 // Reading 20KB would cause three additional writes of 8KB, but less
1185 // than 45KB total, so the socket is reset before all bytes are written.
1186 constexpr size_t kRecvSize = 20 * 1024;
1187 uint8_t recvBuf[kRecvSize];
1188 int bytesRead = acceptedSocket->readAll(recvBuf, sizeof(recvBuf));
1189 ASSERT_EQ(kRecvSize, bytesRead);
1191 constexpr size_t kMinExpectedBytesWritten = // 20 ACK + 8 send buf
1192 kRecvSize + kSockBufSize;
1193 static_assert(kMinExpectedBytesWritten == 28 * 1024, "bad math");
1194 static_assert(kMinExpectedBytesWritten > kRecvSize, "bad math");
1196 constexpr size_t kMaxExpectedBytesWritten = // 24 ACK + 8 sent + 8 send buf
1197 constexpr_ceil(kRecvSize, kSockBufSize) + 2 * kSockBufSize;
1198 static_assert(kMaxExpectedBytesWritten == 40 * 1024, "bad math");
1199 static_assert(kMaxExpectedBytesWritten < kSendSize, "bad math");
1201 // Need to delay after receiving 20KB and before closing the receive side so
1202 // that the send side has a chance to fill the send buffer past.
1203 using clock = std::chrono::steady_clock;
1204 auto const deadline = clock::now() + std::chrono::seconds(2);
1205 while (wcb.bytesWritten < kMinExpectedBytesWritten &&
1206 clock::now() < deadline) {
1207 std::this_thread::yield();
1209 acceptedSocket->closeWithReset();
1211 senderEvb.terminateLoopSoon();
1212 senderThread.join();
1214 ASSERT_EQ(STATE_FAILED, wcb.state);
1215 ASSERT_LE(kMinExpectedBytesWritten, wcb.bytesWritten);
1216 ASSERT_GE(kMaxExpectedBytesWritten, wcb.bytesWritten);
1220 * Test writing a mix of simple buffers and IOBufs
1222 TEST(AsyncSocketTest, WriteIOBuf) {
1227 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
1229 socket->connect(&ccb, server.getAddress(), 30);
1231 // Accept the connection
1232 std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&evb);
1234 acceptedSocket->setReadCB(&rcb);
1236 // Check if EOR tracking flag can be set and reset.
1237 EXPECT_FALSE(socket->isEorTrackingEnabled());
1238 socket->setEorTracking(true);
1239 EXPECT_TRUE(socket->isEorTrackingEnabled());
1240 socket->setEorTracking(false);
1241 EXPECT_FALSE(socket->isEorTrackingEnabled());
1243 // Write a simple buffer to the socket
1244 constexpr size_t simpleBufLength = 5;
1245 char simpleBuf[simpleBufLength];
1246 memset(simpleBuf, 'a', simpleBufLength);
1248 socket->write(&wcb, simpleBuf, simpleBufLength);
1250 // Write a single-element IOBuf chain
1251 size_t buf1Length = 7;
1252 unique_ptr<IOBuf> buf1(IOBuf::create(buf1Length));
1253 memset(buf1->writableData(), 'b', buf1Length);
1254 buf1->append(buf1Length);
1255 unique_ptr<IOBuf> buf1Copy(buf1->clone());
1257 socket->writeChain(&wcb2, std::move(buf1));
1259 // Write a multiple-element IOBuf chain
1260 size_t buf2Length = 11;
1261 unique_ptr<IOBuf> buf2(IOBuf::create(buf2Length));
1262 memset(buf2->writableData(), 'c', buf2Length);
1263 buf2->append(buf2Length);
1264 size_t buf3Length = 13;
1265 unique_ptr<IOBuf> buf3(IOBuf::create(buf3Length));
1266 memset(buf3->writableData(), 'd', buf3Length);
1267 buf3->append(buf3Length);
1268 buf2->appendChain(std::move(buf3));
1269 unique_ptr<IOBuf> buf2Copy(buf2->clone());
1270 buf2Copy->coalesce();
1272 socket->writeChain(&wcb3, std::move(buf2));
1273 socket->shutdownWrite();
1275 // Let the reads and writes run to completion
1278 ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
1279 ASSERT_EQ(wcb2.state, STATE_SUCCEEDED);
1280 ASSERT_EQ(wcb3.state, STATE_SUCCEEDED);
1282 // Make sure the reader got the right data in the right order
1283 ASSERT_EQ(rcb.state, STATE_SUCCEEDED);
1284 ASSERT_EQ(rcb.buffers.size(), 1);
1285 ASSERT_EQ(rcb.buffers[0].length,
1286 simpleBufLength + buf1Length + buf2Length + buf3Length);
1288 memcmp(rcb.buffers[0].buffer, simpleBuf, simpleBufLength), 0);
1290 memcmp(rcb.buffers[0].buffer + simpleBufLength,
1291 buf1Copy->data(), buf1Copy->length()), 0);
1293 memcmp(rcb.buffers[0].buffer + simpleBufLength + buf1Length,
1294 buf2Copy->data(), buf2Copy->length()), 0);
1296 acceptedSocket->close();
1299 ASSERT_TRUE(socket->isClosedBySelf());
1300 ASSERT_FALSE(socket->isClosedByPeer());
1303 TEST(AsyncSocketTest, WriteIOBufCorked) {
1308 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
1310 socket->connect(&ccb, server.getAddress(), 30);
1312 // Accept the connection
1313 std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&evb);
1315 acceptedSocket->setReadCB(&rcb);
1317 // Do three writes, 100ms apart, with the "cork" flag set
1318 // on the second write. The reader should see the first write
1319 // arrive by itself, followed by the second and third writes
1320 // arriving together.
1321 size_t buf1Length = 5;
1322 unique_ptr<IOBuf> buf1(IOBuf::create(buf1Length));
1323 memset(buf1->writableData(), 'a', buf1Length);
1324 buf1->append(buf1Length);
1325 size_t buf2Length = 7;
1326 unique_ptr<IOBuf> buf2(IOBuf::create(buf2Length));
1327 memset(buf2->writableData(), 'b', buf2Length);
1328 buf2->append(buf2Length);
1329 size_t buf3Length = 11;
1330 unique_ptr<IOBuf> buf3(IOBuf::create(buf3Length));
1331 memset(buf3->writableData(), 'c', buf3Length);
1332 buf3->append(buf3Length);
1334 socket->writeChain(&wcb1, std::move(buf1));
1336 DelayedWrite write2(socket, std::move(buf2), &wcb2, true);
1337 write2.scheduleTimeout(100);
1339 DelayedWrite write3(socket, std::move(buf3), &wcb3, false, true);
1340 write3.scheduleTimeout(140);
1343 ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
1344 ASSERT_EQ(wcb1.state, STATE_SUCCEEDED);
1345 ASSERT_EQ(wcb2.state, STATE_SUCCEEDED);
1346 if (wcb3.state != STATE_SUCCEEDED) {
1347 throw(wcb3.exception);
1349 ASSERT_EQ(wcb3.state, STATE_SUCCEEDED);
1351 // Make sure the reader got the data with the right grouping
1352 ASSERT_EQ(rcb.state, STATE_SUCCEEDED);
1353 ASSERT_EQ(rcb.buffers.size(), 2);
1354 ASSERT_EQ(rcb.buffers[0].length, buf1Length);
1355 ASSERT_EQ(rcb.buffers[1].length, buf2Length + buf3Length);
1357 acceptedSocket->close();
1360 ASSERT_TRUE(socket->isClosedBySelf());
1361 ASSERT_FALSE(socket->isClosedByPeer());
1365 * Test performing a zero-length write
1367 TEST(AsyncSocketTest, ZeroLengthWrite) {
1372 std::shared_ptr<AsyncSocket> socket =
1373 AsyncSocket::newSocket(&evb, server.getAddress(), 30);
1374 evb.loop(); // loop until the socket is connected
1376 auto acceptedSocket = server.acceptAsync(&evb);
1378 acceptedSocket->setReadCB(&rcb);
1380 size_t len1 = 1024*1024;
1381 size_t len2 = 1024*1024;
1382 std::unique_ptr<char[]> buf(new char[len1 + len2]);
1383 memset(buf.get(), 'a', len1);
1384 memset(buf.get(), 'b', len2);
1390 socket->write(&wcb1, buf.get(), 0);
1391 socket->write(&wcb2, buf.get(), len1);
1392 socket->write(&wcb3, buf.get() + len1, 0);
1393 socket->write(&wcb4, buf.get() + len1, len2);
1396 evb.loop(); // loop until the data is sent
1398 ASSERT_EQ(wcb1.state, STATE_SUCCEEDED);
1399 ASSERT_EQ(wcb2.state, STATE_SUCCEEDED);
1400 ASSERT_EQ(wcb3.state, STATE_SUCCEEDED);
1401 ASSERT_EQ(wcb4.state, STATE_SUCCEEDED);
1402 rcb.verifyData(buf.get(), len1 + len2);
1404 ASSERT_TRUE(socket->isClosedBySelf());
1405 ASSERT_FALSE(socket->isClosedByPeer());
1408 TEST(AsyncSocketTest, ZeroLengthWritev) {
1413 std::shared_ptr<AsyncSocket> socket =
1414 AsyncSocket::newSocket(&evb, server.getAddress(), 30);
1415 evb.loop(); // loop until the socket is connected
1417 auto acceptedSocket = server.acceptAsync(&evb);
1419 acceptedSocket->setReadCB(&rcb);
1421 size_t len1 = 1024*1024;
1422 size_t len2 = 1024*1024;
1423 std::unique_ptr<char[]> buf(new char[len1 + len2]);
1424 memset(buf.get(), 'a', len1);
1425 memset(buf.get(), 'b', len2);
1428 constexpr size_t iovCount = 4;
1429 struct iovec iov[iovCount];
1430 iov[0].iov_base = buf.get();
1431 iov[0].iov_len = len1;
1432 iov[1].iov_base = buf.get() + len1;
1434 iov[2].iov_base = buf.get() + len1;
1435 iov[2].iov_len = len2;
1436 iov[3].iov_base = buf.get() + len1 + len2;
1439 socket->writev(&wcb, iov, iovCount);
1441 evb.loop(); // loop until the data is sent
1443 ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
1444 rcb.verifyData(buf.get(), len1 + len2);
1446 ASSERT_TRUE(socket->isClosedBySelf());
1447 ASSERT_FALSE(socket->isClosedByPeer());
1450 ///////////////////////////////////////////////////////////////////////////
1451 // close() related tests
1452 ///////////////////////////////////////////////////////////////////////////
1455 * Test calling close() with pending writes when the socket is already closing.
1457 TEST(AsyncSocketTest, ClosePendingWritesWhileClosing) {
1462 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
1464 socket->connect(&ccb, server.getAddress(), 30);
1466 // accept the socket on the server side
1467 std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
1469 // Loop to ensure the connect has completed
1472 // Make sure we are connected
1473 ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
1475 // Schedule pending writes, until several write attempts have blocked
1477 memset(buf, 'a', sizeof(buf));
1478 typedef vector< std::shared_ptr<WriteCallback> > WriteCallbackVector;
1479 WriteCallbackVector writeCallbacks;
1481 writeCallbacks.reserve(5);
1482 while (writeCallbacks.size() < 5) {
1483 std::shared_ptr<WriteCallback> wcb(new WriteCallback);
1485 socket->write(wcb.get(), buf, sizeof(buf));
1486 if (wcb->state == STATE_SUCCEEDED) {
1487 // Succeeded immediately. Keep performing more writes
1491 // This write is blocked.
1492 // Have the write callback call close() when writeError() is invoked
1493 wcb->errorCallback = std::bind(&AsyncSocket::close, socket.get());
1494 writeCallbacks.push_back(wcb);
1497 // Call closeNow() to immediately fail the pending writes
1500 // Make sure writeError() was invoked on all of the pending write callbacks
1501 for (WriteCallbackVector::const_iterator it = writeCallbacks.begin();
1502 it != writeCallbacks.end();
1504 ASSERT_EQ((*it)->state, STATE_FAILED);
1507 ASSERT_TRUE(socket->isClosedBySelf());
1508 ASSERT_FALSE(socket->isClosedByPeer());
1511 ///////////////////////////////////////////////////////////////////////////
1512 // ImmediateRead related tests
1513 ///////////////////////////////////////////////////////////////////////////
1515 /* AsyncSocket use to verify immediate read works */
1516 class AsyncSocketImmediateRead : public folly::AsyncSocket {
1518 bool immediateReadCalled = false;
1519 explicit AsyncSocketImmediateRead(folly::EventBase* evb) : AsyncSocket(evb) {}
1521 void checkForImmediateRead() noexcept override {
1522 immediateReadCalled = true;
1523 AsyncSocket::handleRead();
1527 TEST(AsyncSocket, ConnectReadImmediateRead) {
1530 const size_t maxBufferSz = 100;
1531 const size_t maxReadsPerEvent = 1;
1532 const size_t expectedDataSz = maxBufferSz * 3;
1533 char expectedData[expectedDataSz];
1534 memset(expectedData, 'j', expectedDataSz);
1537 ReadCallback rcb(maxBufferSz);
1538 AsyncSocketImmediateRead socket(&evb);
1539 socket.connect(nullptr, server.getAddress(), 30);
1541 evb.loop(); // loop until the socket is connected
1543 socket.setReadCB(&rcb);
1544 socket.setMaxReadsPerEvent(maxReadsPerEvent);
1545 socket.immediateReadCalled = false;
1547 auto acceptedSocket = server.acceptAsync(&evb);
1549 ReadCallback rcbServer;
1550 WriteCallback wcbServer;
1551 rcbServer.dataAvailableCallback = [&]() {
1552 if (rcbServer.dataRead() == expectedDataSz) {
1553 // write back all data read
1554 rcbServer.verifyData(expectedData, expectedDataSz);
1555 acceptedSocket->write(&wcbServer, expectedData, expectedDataSz);
1556 acceptedSocket->close();
1559 acceptedSocket->setReadCB(&rcbServer);
1563 socket.write(&wcb1, expectedData, expectedDataSz);
1565 ASSERT_EQ(wcb1.state, STATE_SUCCEEDED);
1566 rcb.verifyData(expectedData, expectedDataSz);
1567 ASSERT_EQ(socket.immediateReadCalled, true);
1569 ASSERT_FALSE(socket.isClosedBySelf());
1570 ASSERT_FALSE(socket.isClosedByPeer());
1573 TEST(AsyncSocket, ConnectReadUninstallRead) {
1576 const size_t maxBufferSz = 100;
1577 const size_t maxReadsPerEvent = 1;
1578 const size_t expectedDataSz = maxBufferSz * 3;
1579 char expectedData[expectedDataSz];
1580 memset(expectedData, 'k', expectedDataSz);
1583 ReadCallback rcb(maxBufferSz);
1584 AsyncSocketImmediateRead socket(&evb);
1585 socket.connect(nullptr, server.getAddress(), 30);
1587 evb.loop(); // loop until the socket is connected
1589 socket.setReadCB(&rcb);
1590 socket.setMaxReadsPerEvent(maxReadsPerEvent);
1591 socket.immediateReadCalled = false;
1593 auto acceptedSocket = server.acceptAsync(&evb);
1595 ReadCallback rcbServer;
1596 WriteCallback wcbServer;
1597 rcbServer.dataAvailableCallback = [&]() {
1598 if (rcbServer.dataRead() == expectedDataSz) {
1599 // write back all data read
1600 rcbServer.verifyData(expectedData, expectedDataSz);
1601 acceptedSocket->write(&wcbServer, expectedData, expectedDataSz);
1602 acceptedSocket->close();
1605 acceptedSocket->setReadCB(&rcbServer);
1607 rcb.dataAvailableCallback = [&]() {
1608 // we read data and reset readCB
1609 socket.setReadCB(nullptr);
1614 socket.write(&wcb, expectedData, expectedDataSz);
1616 ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
1618 /* we shoud've only read maxBufferSz data since readCallback_
1619 * was reset in dataAvailableCallback */
1620 ASSERT_EQ(rcb.dataRead(), maxBufferSz);
1621 ASSERT_EQ(socket.immediateReadCalled, false);
1623 ASSERT_FALSE(socket.isClosedBySelf());
1624 ASSERT_FALSE(socket.isClosedByPeer());
1628 // - Test connect() and have the connect callback set the read callback
1629 // - Test connect() and have the connect callback unset the read callback
1630 // - Test reading/writing/closing/destroying the socket in the connect callback
1631 // - Test reading/writing/closing/destroying the socket in the read callback
1632 // - Test reading/writing/closing/destroying the socket in the write callback
1633 // - Test one-way shutdown behavior
1634 // - Test changing the EventBase
1636 // - TODO: test multiple threads sharing a AsyncSocket, and detaching from it
1637 // in connectSuccess(), readDataAvailable(), writeSuccess()
1640 ///////////////////////////////////////////////////////////////////////////
1641 // AsyncServerSocket tests
1642 ///////////////////////////////////////////////////////////////////////////
1645 * Make sure accepted sockets have O_NONBLOCK and TCP_NODELAY set
1647 TEST(AsyncSocketTest, ServerAcceptOptions) {
1648 EventBase eventBase;
1650 // Create a server socket
1651 std::shared_ptr<AsyncServerSocket> serverSocket(
1652 AsyncServerSocket::newSocket(&eventBase));
1653 serverSocket->bind(0);
1654 serverSocket->listen(16);
1655 folly::SocketAddress serverAddress;
1656 serverSocket->getAddress(&serverAddress);
1658 // Add a callback to accept one connection then stop the loop
1659 TestAcceptCallback acceptCallback;
1660 acceptCallback.setConnectionAcceptedFn(
1661 [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1662 serverSocket->removeAcceptCallback(&acceptCallback, &eventBase);
1664 acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
1665 serverSocket->removeAcceptCallback(&acceptCallback, &eventBase);
1667 serverSocket->addAcceptCallback(&acceptCallback, &eventBase);
1668 serverSocket->startAccepting();
1670 // Connect to the server socket
1671 std::shared_ptr<AsyncSocket> socket(
1672 AsyncSocket::newSocket(&eventBase, serverAddress));
1676 // Verify that the server accepted a connection
1677 ASSERT_EQ(acceptCallback.getEvents()->size(), 3);
1678 ASSERT_EQ(acceptCallback.getEvents()->at(0).type,
1679 TestAcceptCallback::TYPE_START);
1680 ASSERT_EQ(acceptCallback.getEvents()->at(1).type,
1681 TestAcceptCallback::TYPE_ACCEPT);
1682 ASSERT_EQ(acceptCallback.getEvents()->at(2).type,
1683 TestAcceptCallback::TYPE_STOP);
1684 int fd = acceptCallback.getEvents()->at(1).fd;
1686 // The accepted connection should already be in non-blocking mode
1687 int flags = fcntl(fd, F_GETFL, 0);
1688 ASSERT_EQ(flags & O_NONBLOCK, O_NONBLOCK);
1691 // The accepted connection should already have TCP_NODELAY set
1693 socklen_t valueLength = sizeof(value);
1694 int rc = getsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &value, &valueLength);
1696 ASSERT_EQ(value, 1);
1701 * Test AsyncServerSocket::removeAcceptCallback()
1703 TEST(AsyncSocketTest, RemoveAcceptCallback) {
1704 // Create a new AsyncServerSocket
1705 EventBase eventBase;
1706 std::shared_ptr<AsyncServerSocket> serverSocket(
1707 AsyncServerSocket::newSocket(&eventBase));
1708 serverSocket->bind(0);
1709 serverSocket->listen(16);
1710 folly::SocketAddress serverAddress;
1711 serverSocket->getAddress(&serverAddress);
1713 // Add several accept callbacks
1714 TestAcceptCallback cb1;
1715 TestAcceptCallback cb2;
1716 TestAcceptCallback cb3;
1717 TestAcceptCallback cb4;
1718 TestAcceptCallback cb5;
1719 TestAcceptCallback cb6;
1720 TestAcceptCallback cb7;
1722 // Test having callbacks remove other callbacks before them on the list,
1723 // after them on the list, or removing themselves.
1725 // Have callback 2 remove callback 3 and callback 5 the first time it is
1728 cb1.setConnectionAcceptedFn([&](int /* fd */,
1729 const folly::SocketAddress& /* addr */) {
1730 std::shared_ptr<AsyncSocket> sock2(
1731 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb2: -cb3 -cb5
1733 cb3.setConnectionAcceptedFn(
1734 [&](int /* fd */, const folly::SocketAddress& /* addr */) {});
1735 cb4.setConnectionAcceptedFn(
1736 [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1737 std::shared_ptr<AsyncSocket> sock3(
1738 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb4
1740 cb5.setConnectionAcceptedFn(
1741 [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1742 std::shared_ptr<AsyncSocket> sock5(
1743 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb7: -cb7
1746 cb2.setConnectionAcceptedFn(
1747 [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1748 if (cb2Count == 0) {
1749 serverSocket->removeAcceptCallback(&cb3, nullptr);
1750 serverSocket->removeAcceptCallback(&cb5, nullptr);
1754 // Have callback 6 remove callback 4 the first time it is called,
1755 // and destroy the server socket the second time it is called
1757 cb6.setConnectionAcceptedFn(
1758 [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1759 if (cb6Count == 0) {
1760 serverSocket->removeAcceptCallback(&cb4, nullptr);
1761 std::shared_ptr<AsyncSocket> sock6(
1762 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb1
1763 std::shared_ptr<AsyncSocket> sock7(
1764 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb2
1765 std::shared_ptr<AsyncSocket> sock8(
1766 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb6: stop
1769 serverSocket.reset();
1773 // Have callback 7 remove itself
1774 cb7.setConnectionAcceptedFn(
1775 [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1776 serverSocket->removeAcceptCallback(&cb7, nullptr);
1779 serverSocket->addAcceptCallback(&cb1, &eventBase);
1780 serverSocket->addAcceptCallback(&cb2, &eventBase);
1781 serverSocket->addAcceptCallback(&cb3, &eventBase);
1782 serverSocket->addAcceptCallback(&cb4, &eventBase);
1783 serverSocket->addAcceptCallback(&cb5, &eventBase);
1784 serverSocket->addAcceptCallback(&cb6, &eventBase);
1785 serverSocket->addAcceptCallback(&cb7, &eventBase);
1786 serverSocket->startAccepting();
1788 // Make several connections to the socket
1789 std::shared_ptr<AsyncSocket> sock1(
1790 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb1
1791 std::shared_ptr<AsyncSocket> sock4(
1792 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb6: -cb4
1794 // Loop until we are stopped
1797 // Check to make sure that the expected callbacks were invoked.
1799 // NOTE: This code depends on the AsyncServerSocket operating calling all of
1800 // the AcceptCallbacks in round-robin fashion, in the order that they were
1801 // added. The code is implemented this way right now, but the API doesn't
1802 // explicitly require it be done this way. If we change the code not to be
1803 // exactly round robin in the future, we can simplify the test checks here.
1804 // (We'll also need to update the termination code, since we expect cb6 to
1805 // get called twice to terminate the loop.)
1806 ASSERT_EQ(cb1.getEvents()->size(), 4);
1807 ASSERT_EQ(cb1.getEvents()->at(0).type,
1808 TestAcceptCallback::TYPE_START);
1809 ASSERT_EQ(cb1.getEvents()->at(1).type,
1810 TestAcceptCallback::TYPE_ACCEPT);
1811 ASSERT_EQ(cb1.getEvents()->at(2).type,
1812 TestAcceptCallback::TYPE_ACCEPT);
1813 ASSERT_EQ(cb1.getEvents()->at(3).type,
1814 TestAcceptCallback::TYPE_STOP);
1816 ASSERT_EQ(cb2.getEvents()->size(), 4);
1817 ASSERT_EQ(cb2.getEvents()->at(0).type,
1818 TestAcceptCallback::TYPE_START);
1819 ASSERT_EQ(cb2.getEvents()->at(1).type,
1820 TestAcceptCallback::TYPE_ACCEPT);
1821 ASSERT_EQ(cb2.getEvents()->at(2).type,
1822 TestAcceptCallback::TYPE_ACCEPT);
1823 ASSERT_EQ(cb2.getEvents()->at(3).type,
1824 TestAcceptCallback::TYPE_STOP);
1826 ASSERT_EQ(cb3.getEvents()->size(), 2);
1827 ASSERT_EQ(cb3.getEvents()->at(0).type,
1828 TestAcceptCallback::TYPE_START);
1829 ASSERT_EQ(cb3.getEvents()->at(1).type,
1830 TestAcceptCallback::TYPE_STOP);
1832 ASSERT_EQ(cb4.getEvents()->size(), 3);
1833 ASSERT_EQ(cb4.getEvents()->at(0).type,
1834 TestAcceptCallback::TYPE_START);
1835 ASSERT_EQ(cb4.getEvents()->at(1).type,
1836 TestAcceptCallback::TYPE_ACCEPT);
1837 ASSERT_EQ(cb4.getEvents()->at(2).type,
1838 TestAcceptCallback::TYPE_STOP);
1840 ASSERT_EQ(cb5.getEvents()->size(), 2);
1841 ASSERT_EQ(cb5.getEvents()->at(0).type,
1842 TestAcceptCallback::TYPE_START);
1843 ASSERT_EQ(cb5.getEvents()->at(1).type,
1844 TestAcceptCallback::TYPE_STOP);
1846 ASSERT_EQ(cb6.getEvents()->size(), 4);
1847 ASSERT_EQ(cb6.getEvents()->at(0).type,
1848 TestAcceptCallback::TYPE_START);
1849 ASSERT_EQ(cb6.getEvents()->at(1).type,
1850 TestAcceptCallback::TYPE_ACCEPT);
1851 ASSERT_EQ(cb6.getEvents()->at(2).type,
1852 TestAcceptCallback::TYPE_ACCEPT);
1853 ASSERT_EQ(cb6.getEvents()->at(3).type,
1854 TestAcceptCallback::TYPE_STOP);
1856 ASSERT_EQ(cb7.getEvents()->size(), 3);
1857 ASSERT_EQ(cb7.getEvents()->at(0).type,
1858 TestAcceptCallback::TYPE_START);
1859 ASSERT_EQ(cb7.getEvents()->at(1).type,
1860 TestAcceptCallback::TYPE_ACCEPT);
1861 ASSERT_EQ(cb7.getEvents()->at(2).type,
1862 TestAcceptCallback::TYPE_STOP);
1866 * Test AsyncServerSocket::removeAcceptCallback()
1868 TEST(AsyncSocketTest, OtherThreadAcceptCallback) {
1869 // Create a new AsyncServerSocket
1870 EventBase eventBase;
1871 std::shared_ptr<AsyncServerSocket> serverSocket(
1872 AsyncServerSocket::newSocket(&eventBase));
1873 serverSocket->bind(0);
1874 serverSocket->listen(16);
1875 folly::SocketAddress serverAddress;
1876 serverSocket->getAddress(&serverAddress);
1878 // Add several accept callbacks
1879 TestAcceptCallback cb1;
1880 auto thread_id = std::this_thread::get_id();
1881 cb1.setAcceptStartedFn([&](){
1882 CHECK_NE(thread_id, std::this_thread::get_id());
1883 thread_id = std::this_thread::get_id();
1885 cb1.setConnectionAcceptedFn(
1886 [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1887 ASSERT_EQ(thread_id, std::this_thread::get_id());
1888 serverSocket->removeAcceptCallback(&cb1, &eventBase);
1890 cb1.setAcceptStoppedFn([&](){
1891 ASSERT_EQ(thread_id, std::this_thread::get_id());
1894 // Test having callbacks remove other callbacks before them on the list,
1895 serverSocket->addAcceptCallback(&cb1, &eventBase);
1896 serverSocket->startAccepting();
1898 // Make several connections to the socket
1899 std::shared_ptr<AsyncSocket> sock1(
1900 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb1
1902 // Loop in another thread
1903 auto other = std::thread([&](){
1908 // Check to make sure that the expected callbacks were invoked.
1910 // NOTE: This code depends on the AsyncServerSocket operating calling all of
1911 // the AcceptCallbacks in round-robin fashion, in the order that they were
1912 // added. The code is implemented this way right now, but the API doesn't
1913 // explicitly require it be done this way. If we change the code not to be
1914 // exactly round robin in the future, we can simplify the test checks here.
1915 // (We'll also need to update the termination code, since we expect cb6 to
1916 // get called twice to terminate the loop.)
1917 ASSERT_EQ(cb1.getEvents()->size(), 3);
1918 ASSERT_EQ(cb1.getEvents()->at(0).type,
1919 TestAcceptCallback::TYPE_START);
1920 ASSERT_EQ(cb1.getEvents()->at(1).type,
1921 TestAcceptCallback::TYPE_ACCEPT);
1922 ASSERT_EQ(cb1.getEvents()->at(2).type,
1923 TestAcceptCallback::TYPE_STOP);
1927 void serverSocketSanityTest(AsyncServerSocket* serverSocket) {
1928 EventBase* eventBase = serverSocket->getEventBase();
1931 // Add a callback to accept one connection then stop accepting
1932 TestAcceptCallback acceptCallback;
1933 acceptCallback.setConnectionAcceptedFn(
1934 [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1935 serverSocket->removeAcceptCallback(&acceptCallback, eventBase);
1937 acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
1938 serverSocket->removeAcceptCallback(&acceptCallback, eventBase);
1940 serverSocket->addAcceptCallback(&acceptCallback, eventBase);
1941 serverSocket->startAccepting();
1943 // Connect to the server socket
1944 folly::SocketAddress serverAddress;
1945 serverSocket->getAddress(&serverAddress);
1946 AsyncSocket::UniquePtr socket(new AsyncSocket(eventBase, serverAddress));
1948 // Loop to process all events
1951 // Verify that the server accepted a connection
1952 ASSERT_EQ(acceptCallback.getEvents()->size(), 3);
1953 ASSERT_EQ(acceptCallback.getEvents()->at(0).type,
1954 TestAcceptCallback::TYPE_START);
1955 ASSERT_EQ(acceptCallback.getEvents()->at(1).type,
1956 TestAcceptCallback::TYPE_ACCEPT);
1957 ASSERT_EQ(acceptCallback.getEvents()->at(2).type,
1958 TestAcceptCallback::TYPE_STOP);
1961 /* Verify that we don't leak sockets if we are destroyed()
1962 * and there are still writes pending
1964 * If destroy() only calls close() instead of closeNow(),
1965 * it would shutdown(writes) on the socket, but it would
1966 * never be close()'d, and the socket would leak
1968 TEST(AsyncSocketTest, DestroyCloseTest) {
1974 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&clientEB);
1976 socket->connect(&ccb, server.getAddress(), 30);
1978 // Accept the connection
1979 std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&serverEB);
1981 acceptedSocket->setReadCB(&rcb);
1983 // Write a large buffer to the socket that is larger than kernel buffer
1984 size_t simpleBufLength = 5000000;
1985 char* simpleBuf = new char[simpleBufLength];
1986 memset(simpleBuf, 'a', simpleBufLength);
1989 // Let the reads and writes run to completion
1990 int fd = acceptedSocket->getFd();
1992 acceptedSocket->write(&wcb, simpleBuf, simpleBufLength);
1994 acceptedSocket.reset();
1996 // Test that server socket was closed
1997 folly::test::msvcSuppressAbortOnInvalidParams([&] {
1998 ssize_t sz = read(fd, simpleBuf, simpleBufLength);
2000 ASSERT_EQ(errno, EBADF);
2006 * Test AsyncServerSocket::useExistingSocket()
2008 TEST(AsyncSocketTest, ServerExistingSocket) {
2009 EventBase eventBase;
2011 // Test creating a socket, and letting AsyncServerSocket bind and listen
2013 // Manually create a socket
2014 int fd = fsp::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
2017 // Create a server socket
2018 AsyncServerSocket::UniquePtr serverSocket(
2019 new AsyncServerSocket(&eventBase));
2020 serverSocket->useExistingSocket(fd);
2021 folly::SocketAddress address;
2022 serverSocket->getAddress(&address);
2024 serverSocket->bind(address);
2025 serverSocket->listen(16);
2027 // Make sure the socket works
2028 serverSocketSanityTest(serverSocket.get());
2031 // Test creating a socket and binding manually,
2032 // then letting AsyncServerSocket listen
2034 // Manually create a socket
2035 int fd = fsp::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
2038 struct sockaddr_in addr;
2039 addr.sin_family = AF_INET;
2041 addr.sin_addr.s_addr = INADDR_ANY;
2042 ASSERT_EQ(bind(fd, reinterpret_cast<struct sockaddr*>(&addr),
2044 // Look up the address that we bound to
2045 folly::SocketAddress boundAddress;
2046 boundAddress.setFromLocalAddress(fd);
2048 // Create a server socket
2049 AsyncServerSocket::UniquePtr serverSocket(
2050 new AsyncServerSocket(&eventBase));
2051 serverSocket->useExistingSocket(fd);
2052 serverSocket->listen(16);
2054 // Make sure AsyncServerSocket reports the same address that we bound to
2055 folly::SocketAddress serverSocketAddress;
2056 serverSocket->getAddress(&serverSocketAddress);
2057 ASSERT_EQ(boundAddress, serverSocketAddress);
2059 // Make sure the socket works
2060 serverSocketSanityTest(serverSocket.get());
2063 // Test creating a socket, binding and listening manually,
2064 // then giving it to AsyncServerSocket
2066 // Manually create a socket
2067 int fd = fsp::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
2070 struct sockaddr_in addr;
2071 addr.sin_family = AF_INET;
2073 addr.sin_addr.s_addr = INADDR_ANY;
2074 ASSERT_EQ(bind(fd, reinterpret_cast<struct sockaddr*>(&addr),
2076 // Look up the address that we bound to
2077 folly::SocketAddress boundAddress;
2078 boundAddress.setFromLocalAddress(fd);
2080 ASSERT_EQ(listen(fd, 16), 0);
2082 // Create a server socket
2083 AsyncServerSocket::UniquePtr serverSocket(
2084 new AsyncServerSocket(&eventBase));
2085 serverSocket->useExistingSocket(fd);
2087 // Make sure AsyncServerSocket reports the same address that we bound to
2088 folly::SocketAddress serverSocketAddress;
2089 serverSocket->getAddress(&serverSocketAddress);
2090 ASSERT_EQ(boundAddress, serverSocketAddress);
2092 // Make sure the socket works
2093 serverSocketSanityTest(serverSocket.get());
2097 TEST(AsyncSocketTest, UnixDomainSocketTest) {
2098 EventBase eventBase;
2100 // Create a server socket
2101 std::shared_ptr<AsyncServerSocket> serverSocket(
2102 AsyncServerSocket::newSocket(&eventBase));
2104 path.append(folly::to<string>("/anonymous", folly::Random::rand64()));
2105 folly::SocketAddress serverAddress;
2106 serverAddress.setFromPath(path);
2107 serverSocket->bind(serverAddress);
2108 serverSocket->listen(16);
2110 // Add a callback to accept one connection then stop the loop
2111 TestAcceptCallback acceptCallback;
2112 acceptCallback.setConnectionAcceptedFn(
2113 [&](int /* fd */, const folly::SocketAddress& /* addr */) {
2114 serverSocket->removeAcceptCallback(&acceptCallback, &eventBase);
2116 acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
2117 serverSocket->removeAcceptCallback(&acceptCallback, &eventBase);
2119 serverSocket->addAcceptCallback(&acceptCallback, &eventBase);
2120 serverSocket->startAccepting();
2122 // Connect to the server socket
2123 std::shared_ptr<AsyncSocket> socket(
2124 AsyncSocket::newSocket(&eventBase, serverAddress));
2128 // Verify that the server accepted a connection
2129 ASSERT_EQ(acceptCallback.getEvents()->size(), 3);
2130 ASSERT_EQ(acceptCallback.getEvents()->at(0).type,
2131 TestAcceptCallback::TYPE_START);
2132 ASSERT_EQ(acceptCallback.getEvents()->at(1).type,
2133 TestAcceptCallback::TYPE_ACCEPT);
2134 ASSERT_EQ(acceptCallback.getEvents()->at(2).type,
2135 TestAcceptCallback::TYPE_STOP);
2136 int fd = acceptCallback.getEvents()->at(1).fd;
2138 // The accepted connection should already be in non-blocking mode
2139 int flags = fcntl(fd, F_GETFL, 0);
2140 ASSERT_EQ(flags & O_NONBLOCK, O_NONBLOCK);
2143 TEST(AsyncSocketTest, ConnectionEventCallbackDefault) {
2144 EventBase eventBase;
2145 TestConnectionEventCallback connectionEventCallback;
2147 // Create a server socket
2148 std::shared_ptr<AsyncServerSocket> serverSocket(
2149 AsyncServerSocket::newSocket(&eventBase));
2150 serverSocket->setConnectionEventCallback(&connectionEventCallback);
2151 serverSocket->bind(0);
2152 serverSocket->listen(16);
2153 folly::SocketAddress serverAddress;
2154 serverSocket->getAddress(&serverAddress);
2156 // Add a callback to accept one connection then stop the loop
2157 TestAcceptCallback acceptCallback;
2158 acceptCallback.setConnectionAcceptedFn(
2159 [&](int /* fd */, const folly::SocketAddress& /* addr */) {
2160 serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
2162 acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
2163 serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
2165 serverSocket->addAcceptCallback(&acceptCallback, &eventBase);
2166 serverSocket->startAccepting();
2168 // Connect to the server socket
2169 std::shared_ptr<AsyncSocket> socket(
2170 AsyncSocket::newSocket(&eventBase, serverAddress));
2174 // Validate the connection event counters
2175 ASSERT_EQ(connectionEventCallback.getConnectionAccepted(), 1);
2176 ASSERT_EQ(connectionEventCallback.getConnectionAcceptedError(), 0);
2177 ASSERT_EQ(connectionEventCallback.getConnectionDropped(), 0);
2179 connectionEventCallback.getConnectionEnqueuedForAcceptCallback(), 1);
2180 ASSERT_EQ(connectionEventCallback.getConnectionDequeuedByAcceptCallback(), 1);
2181 ASSERT_EQ(connectionEventCallback.getBackoffStarted(), 0);
2182 ASSERT_EQ(connectionEventCallback.getBackoffEnded(), 0);
2183 ASSERT_EQ(connectionEventCallback.getBackoffError(), 0);
2186 TEST(AsyncSocketTest, CallbackInPrimaryEventBase) {
2187 EventBase eventBase;
2188 TestConnectionEventCallback connectionEventCallback;
2190 // Create a server socket
2191 std::shared_ptr<AsyncServerSocket> serverSocket(
2192 AsyncServerSocket::newSocket(&eventBase));
2193 serverSocket->setConnectionEventCallback(&connectionEventCallback);
2194 serverSocket->bind(0);
2195 serverSocket->listen(16);
2196 folly::SocketAddress serverAddress;
2197 serverSocket->getAddress(&serverAddress);
2199 // Add a callback to accept one connection then stop the loop
2200 TestAcceptCallback acceptCallback;
2201 acceptCallback.setConnectionAcceptedFn(
2202 [&](int /* fd */, const folly::SocketAddress& /* addr */) {
2203 serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
2205 acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
2206 serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
2208 bool acceptStartedFlag{false};
2209 acceptCallback.setAcceptStartedFn([&acceptStartedFlag](){
2210 acceptStartedFlag = true;
2212 bool acceptStoppedFlag{false};
2213 acceptCallback.setAcceptStoppedFn([&acceptStoppedFlag](){
2214 acceptStoppedFlag = true;
2216 serverSocket->addAcceptCallback(&acceptCallback, nullptr);
2217 serverSocket->startAccepting();
2219 // Connect to the server socket
2220 std::shared_ptr<AsyncSocket> socket(
2221 AsyncSocket::newSocket(&eventBase, serverAddress));
2225 ASSERT_TRUE(acceptStartedFlag);
2226 ASSERT_TRUE(acceptStoppedFlag);
2227 // Validate the connection event counters
2228 ASSERT_EQ(connectionEventCallback.getConnectionAccepted(), 1);
2229 ASSERT_EQ(connectionEventCallback.getConnectionAcceptedError(), 0);
2230 ASSERT_EQ(connectionEventCallback.getConnectionDropped(), 0);
2232 connectionEventCallback.getConnectionEnqueuedForAcceptCallback(), 0);
2233 ASSERT_EQ(connectionEventCallback.getConnectionDequeuedByAcceptCallback(), 0);
2234 ASSERT_EQ(connectionEventCallback.getBackoffStarted(), 0);
2235 ASSERT_EQ(connectionEventCallback.getBackoffEnded(), 0);
2236 ASSERT_EQ(connectionEventCallback.getBackoffError(), 0);
2242 * Test AsyncServerSocket::getNumPendingMessagesInQueue()
2244 TEST(AsyncSocketTest, NumPendingMessagesInQueue) {
2245 EventBase eventBase;
2247 // Counter of how many connections have been accepted
2250 // Create a server socket
2251 auto serverSocket(AsyncServerSocket::newSocket(&eventBase));
2252 serverSocket->bind(0);
2253 serverSocket->listen(16);
2254 folly::SocketAddress serverAddress;
2255 serverSocket->getAddress(&serverAddress);
2257 // Add a callback to accept connections
2258 TestAcceptCallback acceptCallback;
2259 acceptCallback.setConnectionAcceptedFn(
2260 [&](int /* fd */, const folly::SocketAddress& /* addr */) {
2262 ASSERT_EQ(4 - count, serverSocket->getNumPendingMessagesInQueue());
2265 // all messages are processed, remove accept callback
2266 serverSocket->removeAcceptCallback(&acceptCallback, &eventBase);
2269 acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
2270 serverSocket->removeAcceptCallback(&acceptCallback, &eventBase);
2272 serverSocket->addAcceptCallback(&acceptCallback, &eventBase);
2273 serverSocket->startAccepting();
2275 // Connect to the server socket, 4 clients, there are 4 connections
2276 auto socket1(AsyncSocket::newSocket(&eventBase, serverAddress));
2277 auto socket2(AsyncSocket::newSocket(&eventBase, serverAddress));
2278 auto socket3(AsyncSocket::newSocket(&eventBase, serverAddress));
2279 auto socket4(AsyncSocket::newSocket(&eventBase, serverAddress));
2285 * Test AsyncTransport::BufferCallback
2287 TEST(AsyncSocketTest, BufferTest) {
2291 AsyncSocket::OptionMap option{{{SOL_SOCKET, SO_SNDBUF}, 128}};
2292 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
2294 socket->connect(&ccb, server.getAddress(), 30, option);
2296 char buf[100 * 1024];
2297 memset(buf, 'c', sizeof(buf));
2300 socket->setBufferCallback(&bcb);
2301 socket->write(&wcb, buf, sizeof(buf), WriteFlags::NONE);
2304 ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
2305 ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
2307 ASSERT_TRUE(bcb.hasBuffered());
2308 ASSERT_TRUE(bcb.hasBufferCleared());
2311 server.verifyConnection(buf, sizeof(buf));
2313 ASSERT_TRUE(socket->isClosedBySelf());
2314 ASSERT_FALSE(socket->isClosedByPeer());
2317 TEST(AsyncSocketTest, BufferCallbackKill) {
2320 AsyncSocket::OptionMap option{{{SOL_SOCKET, SO_SNDBUF}, 128}};
2321 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
2323 socket->connect(&ccb, server.getAddress(), 30, option);
2326 char buf[100 * 1024];
2327 memset(buf, 'c', sizeof(buf));
2329 socket->setBufferCallback(&bcb);
2331 wcb.successCallback = [&] {
2332 ASSERT_TRUE(socket.unique());
2336 // This will trigger AsyncSocket::handleWrite,
2337 // which calls WriteCallback::writeSuccess,
2338 // which calls wcb.successCallback above,
2339 // which tries to delete socket
2340 // Then, the socket will also try to use this BufferCallback
2341 // And that should crash us, if there is no DestructorGuard on the stack
2342 socket->write(&wcb, buf, sizeof(buf), WriteFlags::NONE);
2345 ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
2349 TEST(AsyncSocketTest, ConnectTFO) {
2350 // Start listening on a local port
2351 TestServer server(true);
2353 // Connect using a AsyncSocket
2355 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
2356 socket->enableTFO();
2358 socket->connect(&cb, server.getAddress(), 30);
2360 std::array<uint8_t, 128> buf;
2361 memset(buf.data(), 'a', buf.size());
2363 std::array<uint8_t, 3> readBuf;
2364 auto sendBuf = IOBuf::copyBuffer("hey");
2367 auto acceptedSocket = server.accept();
2368 acceptedSocket->write(buf.data(), buf.size());
2369 acceptedSocket->flush();
2370 acceptedSocket->readAll(readBuf.data(), readBuf.size());
2371 acceptedSocket->close();
2376 ASSERT_EQ(cb.state, STATE_SUCCEEDED);
2377 EXPECT_LE(0, socket->getConnectTime().count());
2378 EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30));
2379 EXPECT_TRUE(socket->getTFOAttempted());
2381 // Should trigger the connect
2382 WriteCallback write;
2384 socket->writeChain(&write, sendBuf->clone());
2385 socket->setReadCB(&rcb);
2390 EXPECT_EQ(STATE_SUCCEEDED, write.state);
2391 EXPECT_EQ(0, memcmp(readBuf.data(), sendBuf->data(), readBuf.size()));
2392 EXPECT_EQ(STATE_SUCCEEDED, rcb.state);
2393 ASSERT_EQ(1, rcb.buffers.size());
2394 ASSERT_EQ(sizeof(buf), rcb.buffers[0].length);
2395 EXPECT_EQ(0, memcmp(rcb.buffers[0].buffer, buf.data(), buf.size()));
2396 EXPECT_EQ(socket->getTFOFinished(), socket->getTFOSucceded());
2399 TEST(AsyncSocketTest, ConnectTFOSupplyEarlyReadCB) {
2400 // Start listening on a local port
2401 TestServer server(true);
2403 // Connect using a AsyncSocket
2405 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
2406 socket->enableTFO();
2408 socket->connect(&cb, server.getAddress(), 30);
2410 socket->setReadCB(&rcb);
2412 std::array<uint8_t, 128> buf;
2413 memset(buf.data(), 'a', buf.size());
2415 std::array<uint8_t, 3> readBuf;
2416 auto sendBuf = IOBuf::copyBuffer("hey");
2419 auto acceptedSocket = server.accept();
2420 acceptedSocket->write(buf.data(), buf.size());
2421 acceptedSocket->flush();
2422 acceptedSocket->readAll(readBuf.data(), readBuf.size());
2423 acceptedSocket->close();
2428 ASSERT_EQ(cb.state, STATE_SUCCEEDED);
2429 EXPECT_LE(0, socket->getConnectTime().count());
2430 EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30));
2431 EXPECT_TRUE(socket->getTFOAttempted());
2433 // Should trigger the connect
2434 WriteCallback write;
2435 socket->writeChain(&write, sendBuf->clone());
2440 EXPECT_EQ(STATE_SUCCEEDED, write.state);
2441 EXPECT_EQ(0, memcmp(readBuf.data(), sendBuf->data(), readBuf.size()));
2442 EXPECT_EQ(STATE_SUCCEEDED, rcb.state);
2443 ASSERT_EQ(1, rcb.buffers.size());
2444 ASSERT_EQ(sizeof(buf), rcb.buffers[0].length);
2445 EXPECT_EQ(0, memcmp(rcb.buffers[0].buffer, buf.data(), buf.size()));
2446 EXPECT_EQ(socket->getTFOFinished(), socket->getTFOSucceded());
2450 * Test connecting to a server that isn't listening
2452 TEST(AsyncSocketTest, ConnectRefusedImmediatelyTFO) {
2455 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
2457 socket->enableTFO();
2459 // Hopefully nothing is actually listening on this address
2460 folly::SocketAddress addr("::1", 65535);
2462 socket->connect(&cb, addr, 30);
2466 WriteCallback write1;
2467 // Trigger the connect if TFO attempt is supported.
2468 socket->writeChain(&write1, IOBuf::copyBuffer("hey"));
2469 WriteCallback write2;
2470 socket->writeChain(&write2, IOBuf::copyBuffer("hey"));
2473 if (!socket->getTFOFinished()) {
2474 EXPECT_EQ(STATE_FAILED, write1.state);
2476 EXPECT_EQ(STATE_SUCCEEDED, write1.state);
2477 EXPECT_FALSE(socket->getTFOSucceded());
2480 EXPECT_EQ(STATE_FAILED, write2.state);
2482 EXPECT_EQ(STATE_SUCCEEDED, cb.state);
2483 EXPECT_LE(0, socket->getConnectTime().count());
2484 EXPECT_EQ(std::chrono::milliseconds(30), socket->getConnectTimeout());
2485 EXPECT_TRUE(socket->getTFOAttempted());
2489 * Test calling closeNow() immediately after connecting.
2491 TEST(AsyncSocketTest, ConnectWriteAndCloseNowTFO) {
2492 TestServer server(true);
2496 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
2497 socket->enableTFO();
2500 socket->connect(&ccb, server.getAddress(), 30);
2503 std::array<char, 128> buf;
2504 memset(buf.data(), 'a', buf.size());
2509 // Loop, although there shouldn't be anything to do.
2512 ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
2514 ASSERT_TRUE(socket->isClosedBySelf());
2515 ASSERT_FALSE(socket->isClosedByPeer());
2519 * Test calling close() immediately after connect()
2521 TEST(AsyncSocketTest, ConnectAndCloseTFO) {
2522 TestServer server(true);
2524 // Connect using a AsyncSocket
2526 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
2527 socket->enableTFO();
2530 socket->connect(&ccb, server.getAddress(), 30);
2534 // Loop, although there shouldn't be anything to do.
2537 // Make sure the connection was aborted
2538 ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
2540 ASSERT_TRUE(socket->isClosedBySelf());
2541 ASSERT_FALSE(socket->isClosedByPeer());
2544 class MockAsyncTFOSocket : public AsyncSocket {
2546 using UniquePtr = std::unique_ptr<MockAsyncTFOSocket, Destructor>;
2548 explicit MockAsyncTFOSocket(EventBase* evb) : AsyncSocket(evb) {}
2550 MOCK_METHOD3(tfoSendMsg, ssize_t(int fd, struct msghdr* msg, int msg_flags));
2553 TEST(AsyncSocketTest, TestTFOUnsupported) {
2554 TestServer server(true);
2556 // Connect using a AsyncSocket
2558 auto socket = MockAsyncTFOSocket::UniquePtr(new MockAsyncTFOSocket(&evb));
2559 socket->enableTFO();
2562 socket->connect(&ccb, server.getAddress(), 30);
2563 ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
2566 socket->setReadCB(&rcb);
2568 EXPECT_CALL(*socket, tfoSendMsg(_, _, _))
2569 .WillOnce(SetErrnoAndReturn(EOPNOTSUPP, -1));
2570 WriteCallback write;
2571 auto sendBuf = IOBuf::copyBuffer("hey");
2572 socket->writeChain(&write, sendBuf->clone());
2573 EXPECT_EQ(STATE_WAITING, write.state);
2575 std::array<uint8_t, 128> buf;
2576 memset(buf.data(), 'a', buf.size());
2578 std::array<uint8_t, 3> readBuf;
2581 std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
2582 acceptedSocket->write(buf.data(), buf.size());
2583 acceptedSocket->flush();
2584 acceptedSocket->readAll(readBuf.data(), readBuf.size());
2585 acceptedSocket->close();
2591 EXPECT_EQ(STATE_SUCCEEDED, ccb.state);
2592 EXPECT_EQ(STATE_SUCCEEDED, write.state);
2594 EXPECT_EQ(0, memcmp(readBuf.data(), sendBuf->data(), readBuf.size()));
2595 EXPECT_EQ(STATE_SUCCEEDED, rcb.state);
2596 ASSERT_EQ(1, rcb.buffers.size());
2597 ASSERT_EQ(sizeof(buf), rcb.buffers[0].length);
2598 EXPECT_EQ(0, memcmp(rcb.buffers[0].buffer, buf.data(), buf.size()));
2599 EXPECT_EQ(socket->getTFOFinished(), socket->getTFOSucceded());
2602 TEST(AsyncSocketTest, ConnectRefusedDelayedTFO) {
2605 auto socket = MockAsyncTFOSocket::UniquePtr(new MockAsyncTFOSocket(&evb));
2606 socket->enableTFO();
2608 // Hopefully this fails
2609 folly::SocketAddress fakeAddr("127.0.0.1", 65535);
2610 EXPECT_CALL(*socket, tfoSendMsg(_, _, _))
2611 .WillOnce(Invoke([&](int fd, struct msghdr*, int) {
2612 sockaddr_storage addr;
2613 auto len = fakeAddr.getAddress(&addr);
2614 int ret = connect(fd, (const struct sockaddr*)&addr, len);
2615 LOG(INFO) << "connecting the socket " << fd << " : " << ret << " : "
2620 // Hopefully nothing is actually listening on this address
2622 socket->connect(&cb, fakeAddr, 30);
2624 WriteCallback write1;
2625 // Trigger the connect if TFO attempt is supported.
2626 socket->writeChain(&write1, IOBuf::copyBuffer("hey"));
2628 if (socket->getTFOFinished()) {
2629 // This test is useless now.
2632 WriteCallback write2;
2633 // Trigger the connect if TFO attempt is supported.
2634 socket->writeChain(&write2, IOBuf::copyBuffer("hey"));
2637 EXPECT_EQ(STATE_FAILED, write1.state);
2638 EXPECT_EQ(STATE_FAILED, write2.state);
2639 EXPECT_FALSE(socket->getTFOSucceded());
2641 EXPECT_EQ(STATE_SUCCEEDED, cb.state);
2642 EXPECT_LE(0, socket->getConnectTime().count());
2643 EXPECT_EQ(std::chrono::milliseconds(30), socket->getConnectTimeout());
2644 EXPECT_TRUE(socket->getTFOAttempted());
2647 TEST(AsyncSocketTest, TestTFOUnsupportedTimeout) {
2648 // Try connecting to server that won't respond.
2650 // This depends somewhat on the network where this test is run.
2651 // Hopefully this IP will be routable but unresponsive.
2652 // (Alternatively, we could try listening on a local raw socket, but that
2653 // normally requires root privileges.)
2654 auto host = SocketAddressTestHelper::isIPv6Enabled()
2655 ? SocketAddressTestHelper::kGooglePublicDnsAAddrIPv6
2656 : SocketAddressTestHelper::isIPv4Enabled()
2657 ? SocketAddressTestHelper::kGooglePublicDnsAAddrIPv4
2659 SocketAddress addr(host, 65535);
2661 // Connect using a AsyncSocket
2663 auto socket = MockAsyncTFOSocket::UniquePtr(new MockAsyncTFOSocket(&evb));
2664 socket->enableTFO();
2667 // Set a very small timeout
2668 socket->connect(&ccb, addr, 1);
2669 EXPECT_EQ(STATE_SUCCEEDED, ccb.state);
2672 socket->setReadCB(&rcb);
2674 EXPECT_CALL(*socket, tfoSendMsg(_, _, _))
2675 .WillOnce(SetErrnoAndReturn(EOPNOTSUPP, -1));
2676 WriteCallback write;
2677 socket->writeChain(&write, IOBuf::copyBuffer("hey"));
2681 EXPECT_EQ(STATE_FAILED, write.state);
2684 TEST(AsyncSocketTest, TestTFOFallbackToConnect) {
2685 TestServer server(true);
2687 // Connect using a AsyncSocket
2689 auto socket = MockAsyncTFOSocket::UniquePtr(new MockAsyncTFOSocket(&evb));
2690 socket->enableTFO();
2693 socket->connect(&ccb, server.getAddress(), 30);
2694 ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
2697 socket->setReadCB(&rcb);
2699 EXPECT_CALL(*socket, tfoSendMsg(_, _, _))
2700 .WillOnce(Invoke([&](int fd, struct msghdr*, int) {
2701 sockaddr_storage addr;
2702 auto len = server.getAddress().getAddress(&addr);
2703 return connect(fd, (const struct sockaddr*)&addr, len);
2705 WriteCallback write;
2706 auto sendBuf = IOBuf::copyBuffer("hey");
2707 socket->writeChain(&write, sendBuf->clone());
2708 EXPECT_EQ(STATE_WAITING, write.state);
2710 std::array<uint8_t, 128> buf;
2711 memset(buf.data(), 'a', buf.size());
2713 std::array<uint8_t, 3> readBuf;
2716 std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
2717 acceptedSocket->write(buf.data(), buf.size());
2718 acceptedSocket->flush();
2719 acceptedSocket->readAll(readBuf.data(), readBuf.size());
2720 acceptedSocket->close();
2726 EXPECT_EQ(0, memcmp(readBuf.data(), sendBuf->data(), readBuf.size()));
2728 EXPECT_EQ(STATE_SUCCEEDED, ccb.state);
2729 EXPECT_EQ(STATE_SUCCEEDED, write.state);
2731 EXPECT_EQ(STATE_SUCCEEDED, rcb.state);
2732 ASSERT_EQ(1, rcb.buffers.size());
2733 ASSERT_EQ(buf.size(), rcb.buffers[0].length);
2734 EXPECT_EQ(0, memcmp(rcb.buffers[0].buffer, buf.data(), buf.size()));
2737 TEST(AsyncSocketTest, TestTFOFallbackTimeout) {
2738 // Try connecting to server that won't respond.
2740 // This depends somewhat on the network where this test is run.
2741 // Hopefully this IP will be routable but unresponsive.
2742 // (Alternatively, we could try listening on a local raw socket, but that
2743 // normally requires root privileges.)
2744 auto host = SocketAddressTestHelper::isIPv6Enabled()
2745 ? SocketAddressTestHelper::kGooglePublicDnsAAddrIPv6
2746 : SocketAddressTestHelper::isIPv4Enabled()
2747 ? SocketAddressTestHelper::kGooglePublicDnsAAddrIPv4
2749 SocketAddress addr(host, 65535);
2751 // Connect using a AsyncSocket
2753 auto socket = MockAsyncTFOSocket::UniquePtr(new MockAsyncTFOSocket(&evb));
2754 socket->enableTFO();
2757 // Set a very small timeout
2758 socket->connect(&ccb, addr, 1);
2759 EXPECT_EQ(STATE_SUCCEEDED, ccb.state);
2762 socket->setReadCB(&rcb);
2764 EXPECT_CALL(*socket, tfoSendMsg(_, _, _))
2765 .WillOnce(Invoke([&](int fd, struct msghdr*, int) {
2766 sockaddr_storage addr2;
2767 auto len = addr.getAddress(&addr2);
2768 return connect(fd, (const struct sockaddr*)&addr2, len);
2770 WriteCallback write;
2771 socket->writeChain(&write, IOBuf::copyBuffer("hey"));
2775 EXPECT_EQ(STATE_FAILED, write.state);
2778 TEST(AsyncSocketTest, TestTFOEagain) {
2779 TestServer server(true);
2781 // Connect using a AsyncSocket
2783 auto socket = MockAsyncTFOSocket::UniquePtr(new MockAsyncTFOSocket(&evb));
2784 socket->enableTFO();
2787 socket->connect(&ccb, server.getAddress(), 30);
2789 EXPECT_CALL(*socket, tfoSendMsg(_, _, _))
2790 .WillOnce(SetErrnoAndReturn(EAGAIN, -1));
2791 WriteCallback write;
2792 socket->writeChain(&write, IOBuf::copyBuffer("hey"));
2796 EXPECT_EQ(STATE_SUCCEEDED, ccb.state);
2797 EXPECT_EQ(STATE_FAILED, write.state);
2800 // Sending a large amount of data in the first write which will
2801 // definitely not fit into MSS.
2802 TEST(AsyncSocketTest, ConnectTFOWithBigData) {
2803 // Start listening on a local port
2804 TestServer server(true);
2806 // Connect using a AsyncSocket
2808 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
2809 socket->enableTFO();
2811 socket->connect(&cb, server.getAddress(), 30);
2813 std::array<uint8_t, 128> buf;
2814 memset(buf.data(), 'a', buf.size());
2816 constexpr size_t len = 10 * 1024;
2817 auto sendBuf = IOBuf::create(len);
2818 sendBuf->append(len);
2819 std::array<uint8_t, len> readBuf;
2822 auto acceptedSocket = server.accept();
2823 acceptedSocket->write(buf.data(), buf.size());
2824 acceptedSocket->flush();
2825 acceptedSocket->readAll(readBuf.data(), readBuf.size());
2826 acceptedSocket->close();
2831 ASSERT_EQ(cb.state, STATE_SUCCEEDED);
2832 EXPECT_LE(0, socket->getConnectTime().count());
2833 EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30));
2834 EXPECT_TRUE(socket->getTFOAttempted());
2836 // Should trigger the connect
2837 WriteCallback write;
2839 socket->writeChain(&write, sendBuf->clone());
2840 socket->setReadCB(&rcb);
2845 EXPECT_EQ(STATE_SUCCEEDED, write.state);
2846 EXPECT_EQ(0, memcmp(readBuf.data(), sendBuf->data(), readBuf.size()));
2847 EXPECT_EQ(STATE_SUCCEEDED, rcb.state);
2848 ASSERT_EQ(1, rcb.buffers.size());
2849 ASSERT_EQ(sizeof(buf), rcb.buffers[0].length);
2850 EXPECT_EQ(0, memcmp(rcb.buffers[0].buffer, buf.data(), buf.size()));
2851 EXPECT_EQ(socket->getTFOFinished(), socket->getTFOSucceded());
2854 #endif // FOLLY_ALLOW_TFO
2856 class MockEvbChangeCallback : public AsyncSocket::EvbChangeCallback {
2858 MOCK_METHOD1(evbAttached, void(AsyncSocket*));
2859 MOCK_METHOD1(evbDetached, void(AsyncSocket*));
2862 TEST(AsyncSocketTest, EvbCallbacks) {
2863 auto cb = std::make_unique<MockEvbChangeCallback>();
2865 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
2868 EXPECT_CALL(*cb, evbDetached(socket.get())).Times(1);
2869 EXPECT_CALL(*cb, evbAttached(socket.get())).Times(1);
2871 socket->setEvbChangedCallback(std::move(cb));
2872 socket->detachEventBase();
2873 socket->attachEventBase(&evb);
2876 TEST(AsyncSocketTest, TestEvbDetachWtRegisteredIOHandlers) {
2877 // Start listening on a local port
2880 // Connect using a AsyncSocket
2882 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
2884 socket->connect(&cb, server.getAddress(), 30);
2888 ASSERT_EQ(cb.state, STATE_SUCCEEDED);
2889 EXPECT_LE(0, socket->getConnectTime().count());
2890 EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30));
2892 // After the ioHandlers are registered, still should be able to detach/attach
2894 socket->setReadCB(&rcb);
2896 auto cbEvbChg = std::make_unique<MockEvbChangeCallback>();
2898 EXPECT_CALL(*cbEvbChg, evbDetached(socket.get())).Times(1);
2899 EXPECT_CALL(*cbEvbChg, evbAttached(socket.get())).Times(1);
2901 socket->setEvbChangedCallback(std::move(cbEvbChg));
2902 EXPECT_TRUE(socket->isDetachable());
2903 socket->detachEventBase();
2904 socket->attachEventBase(&evb);
2909 #ifdef FOLLY_HAVE_MSG_ERRQUEUE
2910 /* copied from include/uapi/linux/net_tstamp.h */
2911 /* SO_TIMESTAMPING gets an integer bit field comprised of these values */
2912 enum SOF_TIMESTAMPING {
2913 SOF_TIMESTAMPING_SOFTWARE = (1 << 4),
2914 SOF_TIMESTAMPING_OPT_ID = (1 << 7),
2915 SOF_TIMESTAMPING_TX_SCHED = (1 << 8),
2916 SOF_TIMESTAMPING_OPT_CMSG = (1 << 10),
2917 SOF_TIMESTAMPING_OPT_TSONLY = (1 << 11),
2920 class TestErrMessageCallback : public folly::AsyncSocket::ErrMessageCallback {
2922 TestErrMessageCallback()
2923 : exception_(folly::AsyncSocketException::UNKNOWN, "none") {}
2925 void errMessage(const cmsghdr& cmsg) noexcept override {
2926 if (cmsg.cmsg_level == SOL_SOCKET && cmsg.cmsg_type == SCM_TIMESTAMPING) {
2928 checkResetCallback();
2930 (cmsg.cmsg_level == SOL_IP && cmsg.cmsg_type == IP_RECVERR) ||
2931 (cmsg.cmsg_level == SOL_IPV6 && cmsg.cmsg_type == IPV6_RECVERR)) {
2933 checkResetCallback();
2937 void errMessageError(
2938 const folly::AsyncSocketException& ex) noexcept override {
2942 void checkResetCallback() noexcept {
2943 if (socket_ != nullptr && resetAfter_ != -1 &&
2944 gotTimestamp_ + gotByteSeq_ == resetAfter_) {
2945 socket_->setErrMessageCB(nullptr);
2949 folly::AsyncSocket* socket_{nullptr};
2950 folly::AsyncSocketException exception_;
2951 int gotTimestamp_{0};
2953 int resetAfter_{-1};
2956 TEST(AsyncSocketTest, ErrMessageCallback) {
2961 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
2964 socket->connect(&ccb, server.getAddress(), 30);
2965 LOG(INFO) << "Client socket fd=" << socket->getFd();
2970 ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
2972 // Set read callback to keep the socket subscribed for event
2973 // notifications. Though we're no planning to read anything from
2974 // this side of the connection.
2975 ReadCallback rcb(1);
2976 socket->setReadCB(&rcb);
2978 // Set up timestamp callbacks
2979 TestErrMessageCallback errMsgCB;
2980 socket->setErrMessageCB(&errMsgCB);
2981 ASSERT_EQ(socket->getErrMessageCallback(),
2982 static_cast<folly::AsyncSocket::ErrMessageCallback*>(&errMsgCB));
2984 errMsgCB.socket_ = socket.get();
2985 errMsgCB.resetAfter_ = 3;
2987 // Enable timestamp notifications
2988 ASSERT_GT(socket->getFd(), 0);
2989 int flags = SOF_TIMESTAMPING_OPT_ID
2990 | SOF_TIMESTAMPING_OPT_TSONLY
2991 | SOF_TIMESTAMPING_SOFTWARE
2992 | SOF_TIMESTAMPING_OPT_CMSG
2993 | SOF_TIMESTAMPING_TX_SCHED;
2994 AsyncSocket::OptionKey tstampingOpt = {SOL_SOCKET, SO_TIMESTAMPING};
2995 EXPECT_EQ(tstampingOpt.apply(socket->getFd(), flags), 0);
2998 std::vector<uint8_t> wbuf(128, 'a');
3000 // Send two packets to get two EOM notifications
3001 socket->write(&wcb, wbuf.data(), wbuf.size() / 2);
3002 socket->write(&wcb, wbuf.data() + wbuf.size() / 2, wbuf.size() / 2);
3004 // Accept the connection.
3005 std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
3006 LOG(INFO) << "Server socket fd=" << acceptedSocket->getSocketFD();
3010 ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
3012 // Check that we can read the data that was written to the socket
3013 std::vector<uint8_t> rbuf(1 + wbuf.size(), 0);
3014 uint32_t bytesRead = acceptedSocket->read(rbuf.data(), rbuf.size());
3015 ASSERT_TRUE(std::equal(wbuf.begin(), wbuf.end(), rbuf.begin()));
3016 ASSERT_EQ(bytesRead, wbuf.size());
3018 // Close both sockets
3019 acceptedSocket->close();
3022 ASSERT_TRUE(socket->isClosedBySelf());
3023 ASSERT_FALSE(socket->isClosedByPeer());
3025 // Check for the timestamp notifications.
3026 ASSERT_EQ(errMsgCB.exception_.type_, folly::AsyncSocketException::UNKNOWN);
3027 ASSERT_GT(errMsgCB.gotByteSeq_, 0);
3028 ASSERT_GT(errMsgCB.gotTimestamp_, 0);
3030 errMsgCB.gotByteSeq_ + errMsgCB.gotTimestamp_, errMsgCB.resetAfter_);
3032 #endif // FOLLY_HAVE_MSG_ERRQUEUE
3034 TEST(AsyncSocket, PreReceivedData) {
3038 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
3039 socket->connect(nullptr, server.getAddress(), 30);
3042 socket->writeChain(nullptr, IOBuf::copyBuffer("hello"));
3044 auto acceptedSocket = server.acceptAsync(&evb);
3046 ReadCallback peekCallback(2);
3047 ReadCallback readCallback;
3048 peekCallback.dataAvailableCallback = [&]() {
3049 peekCallback.verifyData("he", 2);
3050 acceptedSocket->setPreReceivedData(IOBuf::copyBuffer("h"));
3051 acceptedSocket->setPreReceivedData(IOBuf::copyBuffer("e"));
3052 acceptedSocket->setReadCB(nullptr);
3053 acceptedSocket->setReadCB(&readCallback);
3055 readCallback.dataAvailableCallback = [&]() {
3056 if (readCallback.dataRead() == 5) {
3057 readCallback.verifyData("hello", 5);
3058 acceptedSocket->setReadCB(nullptr);
3062 acceptedSocket->setReadCB(&peekCallback);
3067 TEST(AsyncSocket, PreReceivedDataOnly) {
3071 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
3072 socket->connect(nullptr, server.getAddress(), 30);
3075 socket->writeChain(nullptr, IOBuf::copyBuffer("hello"));
3077 auto acceptedSocket = server.acceptAsync(&evb);
3079 ReadCallback peekCallback;
3080 ReadCallback readCallback;
3081 peekCallback.dataAvailableCallback = [&]() {
3082 peekCallback.verifyData("hello", 5);
3083 acceptedSocket->setPreReceivedData(IOBuf::copyBuffer("hello"));
3084 acceptedSocket->setReadCB(&readCallback);
3086 readCallback.dataAvailableCallback = [&]() {
3087 readCallback.verifyData("hello", 5);
3088 acceptedSocket->setReadCB(nullptr);
3091 acceptedSocket->setReadCB(&peekCallback);
3096 TEST(AsyncSocket, PreReceivedDataPartial) {
3100 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
3101 socket->connect(nullptr, server.getAddress(), 30);
3104 socket->writeChain(nullptr, IOBuf::copyBuffer("hello"));
3106 auto acceptedSocket = server.acceptAsync(&evb);
3108 ReadCallback peekCallback;
3109 ReadCallback smallReadCallback(3);
3110 ReadCallback normalReadCallback;
3111 peekCallback.dataAvailableCallback = [&]() {
3112 peekCallback.verifyData("hello", 5);
3113 acceptedSocket->setPreReceivedData(IOBuf::copyBuffer("hello"));
3114 acceptedSocket->setReadCB(&smallReadCallback);
3116 smallReadCallback.dataAvailableCallback = [&]() {
3117 smallReadCallback.verifyData("hel", 3);
3118 acceptedSocket->setReadCB(&normalReadCallback);
3120 normalReadCallback.dataAvailableCallback = [&]() {
3121 normalReadCallback.verifyData("lo", 2);
3122 acceptedSocket->setReadCB(nullptr);
3125 acceptedSocket->setReadCB(&peekCallback);
3130 TEST(AsyncSocket, PreReceivedDataTakeover) {
3134 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
3135 socket->connect(nullptr, server.getAddress(), 30);
3138 socket->writeChain(nullptr, IOBuf::copyBuffer("hello"));
3140 auto acceptedSocket =
3141 AsyncSocket::UniquePtr(new AsyncSocket(&evb, server.acceptFD()));
3142 AsyncSocket::UniquePtr takeoverSocket;
3144 ReadCallback peekCallback(3);
3145 ReadCallback readCallback;
3146 peekCallback.dataAvailableCallback = [&]() {
3147 peekCallback.verifyData("hel", 3);
3148 acceptedSocket->setPreReceivedData(IOBuf::copyBuffer("hello"));
3149 acceptedSocket->setReadCB(nullptr);
3151 AsyncSocket::UniquePtr(new AsyncSocket(std::move(acceptedSocket)));
3152 takeoverSocket->setReadCB(&readCallback);
3154 readCallback.dataAvailableCallback = [&]() {
3155 readCallback.verifyData("hello", 5);
3156 takeoverSocket->setReadCB(nullptr);
3159 acceptedSocket->setReadCB(&peekCallback);
3165 TEST(AsyncSocketTest, SendMessageFlags) {
3167 TestSendMsgParamsCallback sendMsgCB(
3168 MSG_DONTWAIT|MSG_NOSIGNAL|MSG_MORE, 0, nullptr);
3172 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
3175 socket->connect(&ccb, server.getAddress(), 30);
3176 std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
3179 ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
3181 // Set SendMsgParamsCallback
3182 socket->setSendMsgParamCB(&sendMsgCB);
3183 ASSERT_EQ(socket->getSendMsgParamsCB(), &sendMsgCB);
3185 // Write the first portion of data. This data is expected to be
3186 // sent out immediately.
3187 std::vector<uint8_t> buf(128, 'a');
3189 sendMsgCB.reset(MSG_DONTWAIT | MSG_NOSIGNAL);
3190 socket->write(&wcb, buf.data(), buf.size());
3191 ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
3192 ASSERT_TRUE(sendMsgCB.queriedFlags_);
3193 ASSERT_FALSE(sendMsgCB.queriedData_);
3195 // Using different flags for the second write operation.
3196 // MSG_MORE flag is expected to delay sending this
3197 // data to the wire.
3198 sendMsgCB.reset(MSG_DONTWAIT | MSG_NOSIGNAL | MSG_MORE);
3199 socket->write(&wcb, buf.data(), buf.size());
3200 ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
3201 ASSERT_TRUE(sendMsgCB.queriedFlags_);
3202 ASSERT_FALSE(sendMsgCB.queriedData_);
3204 // Make sure the accepted socket saw only the data from
3205 // the first write request.
3206 std::vector<uint8_t> readbuf(2 * buf.size());
3207 uint32_t bytesRead = acceptedSocket->read(readbuf.data(), readbuf.size());
3208 ASSERT_TRUE(std::equal(buf.begin(), buf.end(), readbuf.begin()));
3209 ASSERT_EQ(bytesRead, buf.size());
3211 // Make sure the server got a connection and received the data
3212 acceptedSocket->close();
3215 ASSERT_TRUE(socket->isClosedBySelf());
3216 ASSERT_FALSE(socket->isClosedByPeer());
3219 TEST(AsyncSocketTest, SendMessageAncillaryData) {
3221 EXPECT_EQ(socketpair(AF_UNIX, SOCK_STREAM, 0, fds), 0);
3230 SCOPE_EXIT { close(sfd); };
3232 // Instantiate AsyncSocket object for the connected socket
3234 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb, cfd);
3236 // Open a temporary file and write a magic string to it
3237 // We'll transfer the file handle to test the message parameters
3239 TemporaryFile file(StringPiece(),
3241 TemporaryFile::Scope::UNLINK_IMMEDIATELY);
3242 int tmpfd = file.fd();
3243 ASSERT_NE(tmpfd, -1) << "Failed to open a temporary file";
3244 std::string magicString("Magic string");
3245 ASSERT_EQ(write(tmpfd, magicString.c_str(), magicString.length()),
3246 magicString.length());
3250 // Space large enough to hold an 'int'
3251 char control[CMSG_SPACE(sizeof(int))];
3254 s_u.cmh.cmsg_len = CMSG_LEN(sizeof(int));
3255 s_u.cmh.cmsg_level = SOL_SOCKET;
3256 s_u.cmh.cmsg_type = SCM_RIGHTS;
3257 memcpy(CMSG_DATA(&s_u.cmh), &tmpfd, sizeof(int));
3259 // Set up the callback providing message parameters
3260 TestSendMsgParamsCallback sendMsgCB(
3261 MSG_DONTWAIT | MSG_NOSIGNAL, sizeof(s_u.control), s_u.control);
3262 socket->setSendMsgParamCB(&sendMsgCB);
3264 // We must transmit at least 1 byte of real data in order
3265 // to send ancillary data
3268 socket->write(&wcb, &s_data, sizeof(s_data));
3269 ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
3271 // Receive the message
3273 // Space large enough to hold an 'int'
3274 char control[CMSG_SPACE(sizeof(int))];
3281 msgh.msg_control = r_u.control;
3282 msgh.msg_controllen = sizeof(r_u.control);
3283 msgh.msg_name = nullptr;
3284 msgh.msg_namelen = 0;
3285 msgh.msg_iov = &iov;
3286 msgh.msg_iovlen = 1;
3287 iov.iov_base = &r_data;
3288 iov.iov_len = sizeof(r_data);
3291 ASSERT_NE(recvmsg(sfd, &msgh, 0), -1) << "recvmsg failed: " << errno;
3293 // Validate the received message
3294 ASSERT_EQ(r_u.cmh.cmsg_len, CMSG_LEN(sizeof(int)));
3295 ASSERT_EQ(r_u.cmh.cmsg_level, SOL_SOCKET);
3296 ASSERT_EQ(r_u.cmh.cmsg_type, SCM_RIGHTS);
3297 ASSERT_EQ(r_data, s_data);
3299 memcpy(&fd, CMSG_DATA(&r_u.cmh), sizeof(int));
3301 SCOPE_EXIT { close(fd); };
3303 std::vector<uint8_t> transferredMagicString(magicString.length() + 1, 0);
3305 // Reposition to the beginning of the file
3306 ASSERT_EQ(0, lseek(fd, 0, SEEK_SET));
3308 // Read the magic string back, and compare it with the original
3310 magicString.length(),
3311 read(fd, transferredMagicString.data(), transferredMagicString.size()));
3312 ASSERT_TRUE(std::equal(
3313 magicString.begin(),
3315 transferredMagicString.begin()));
3318 TEST(AsyncSocketTest, UnixDomainSocketErrMessageCB) {
3319 // In the latest stable kernel 4.14.3 as of 2017-12-04, Unix Domain
3320 // Socket (UDS) does not support MSG_ERRQUEUE. So
3321 // recvmsg(MSG_ERRQUEUE) will read application data from UDS which
3322 // breaks application message flow. To avoid this problem,
3323 // AsyncSocket currently disables setErrMessageCB for UDS.
3325 // This tests two things for UDS
3326 // 1. setErrMessageCB fails
3327 // 2. recvmsg(MSG_ERRQUEUE) reads application data
3329 // Feel free to remove this test if UDS supports MSG_ERRQUEUE in the future.
3332 EXPECT_EQ(socketpair(AF_UNIX, SOCK_STREAM, 0, fd), 0);
3333 ASSERT_NE(fd[0], -1);
3334 ASSERT_NE(fd[1], -1);
3339 EXPECT_EQ(fcntl(fd[0], F_SETFL, O_NONBLOCK), 0);
3340 EXPECT_EQ(fcntl(fd[1], F_SETFL, O_NONBLOCK), 0);
3343 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb, fd[0]);
3345 // setErrMessageCB should fail for unix domain socket
3346 TestErrMessageCallback errMsgCB;
3347 ASSERT_NE(&errMsgCB, nullptr);
3348 socket->setErrMessageCB(&errMsgCB);
3349 ASSERT_EQ(socket->getErrMessageCallback(), nullptr);
3351 #ifdef FOLLY_HAVE_MSG_ERRQUEUE
3352 // The following verifies that MSG_ERRQUEUE does not work for UDS,
3353 // and recvmsg reads application data
3355 // Space large enough to hold an 'int'
3356 char control[CMSG_SPACE(sizeof(int))];
3363 msgh.msg_control = r_u.control;
3364 msgh.msg_controllen = sizeof(r_u.control);
3365 msgh.msg_name = nullptr;
3366 msgh.msg_namelen = 0;
3367 msgh.msg_iov = &iov;
3368 msgh.msg_iovlen = 1;
3369 iov.iov_base = &recv_data;
3370 iov.iov_len = sizeof(recv_data);
3372 // there is no data, recvmsg should fail
3373 EXPECT_EQ(recvmsg(fd[1], &msgh, MSG_ERRQUEUE), -1);
3374 EXPECT_TRUE(errno == EAGAIN || errno == EWOULDBLOCK);
3376 // provide some application data, error queue should be empty if it exists
3377 // However, UDS reads application data as error message
3378 int test_data = 123456;
3380 socket->write(&wcb, &test_data, sizeof(test_data));
3382 ASSERT_NE(recvmsg(fd[1], &msgh, MSG_ERRQUEUE), -1);
3383 ASSERT_EQ(recv_data, test_data);
3384 #endif // FOLLY_HAVE_MSG_ERRQUEUE