2 * Copyright 2016 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 #include <folly/ExceptionWrapper.h>
17 #include <folly/RWSpinLock.h>
18 #include <folly/Random.h>
19 #include <folly/SocketAddress.h>
20 #include <folly/io/async/AsyncServerSocket.h>
21 #include <folly/io/async/AsyncSocket.h>
22 #include <folly/io/async/AsyncTimeout.h>
23 #include <folly/io/async/EventBase.h>
25 #include <folly/io/IOBuf.h>
26 #include <folly/io/async/test/AsyncSocketTest.h>
27 #include <folly/io/async/test/Util.h>
28 #include <folly/portability/GMock.h>
29 #include <folly/portability/GTest.h>
30 #include <folly/portability/Sockets.h>
31 #include <folly/portability/Unistd.h>
32 #include <folly/test/SocketAddressTestHelper.h>
34 #include <boost/scoped_array.hpp>
36 #include <sys/types.h>
40 using namespace boost;
47 using std::unique_ptr;
48 using std::chrono::milliseconds;
49 using boost::scoped_array;
51 using namespace folly;
52 using namespace testing;
54 namespace fsp = folly::portability::sockets;
56 class DelayedWrite: public AsyncTimeout {
58 DelayedWrite(const std::shared_ptr<AsyncSocket>& socket,
59 unique_ptr<IOBuf>&& bufs, AsyncTransportWrapper::WriteCallback* wcb,
60 bool cork, bool lastWrite = false):
61 AsyncTimeout(socket->getEventBase()),
63 bufs_(std::move(bufs)),
66 lastWrite_(lastWrite) {}
69 void timeoutExpired() noexcept override {
70 WriteFlags flags = cork_ ? WriteFlags::CORK : WriteFlags::NONE;
71 socket_->writeChain(wcb_, std::move(bufs_), flags);
73 socket_->shutdownWrite();
77 std::shared_ptr<AsyncSocket> socket_;
78 unique_ptr<IOBuf> bufs_;
79 AsyncTransportWrapper::WriteCallback* wcb_;
84 ///////////////////////////////////////////////////////////////////////////
86 ///////////////////////////////////////////////////////////////////////////
89 * Test connecting to a server
91 TEST(AsyncSocketTest, Connect) {
92 // Start listening on a local port
95 // Connect using a AsyncSocket
97 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
99 socket->connect(&cb, server.getAddress(), 30);
103 ASSERT_EQ(cb.state, STATE_SUCCEEDED);
104 EXPECT_LE(0, socket->getConnectTime().count());
105 EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30));
108 enum class TFOState {
113 class AsyncSocketConnectTest : public ::testing::TestWithParam<TFOState> {};
115 std::vector<TFOState> getTestingValues() {
116 std::vector<TFOState> vals;
117 vals.emplace_back(TFOState::DISABLED);
120 vals.emplace_back(TFOState::ENABLED);
125 INSTANTIATE_TEST_CASE_P(
127 AsyncSocketConnectTest,
128 ::testing::ValuesIn(getTestingValues()));
131 * Test connecting to a server that isn't listening
133 TEST(AsyncSocketTest, ConnectRefused) {
136 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
138 // Hopefully nothing is actually listening on this address
139 folly::SocketAddress addr("127.0.0.1", 65535);
141 socket->connect(&cb, addr, 30);
145 EXPECT_EQ(STATE_FAILED, cb.state);
146 EXPECT_EQ(AsyncSocketException::NOT_OPEN, cb.exception.getType());
147 EXPECT_LE(0, socket->getConnectTime().count());
148 EXPECT_EQ(std::chrono::milliseconds(30), socket->getConnectTimeout());
152 * Test connection timeout
154 TEST(AsyncSocketTest, ConnectTimeout) {
157 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
159 // Try connecting to server that won't respond.
161 // This depends somewhat on the network where this test is run.
162 // Hopefully this IP will be routable but unresponsive.
163 // (Alternatively, we could try listening on a local raw socket, but that
164 // normally requires root privileges.)
166 SocketAddressTestHelper::isIPv6Enabled() ?
167 SocketAddressTestHelper::kGooglePublicDnsAAddrIPv6 :
168 SocketAddressTestHelper::isIPv4Enabled() ?
169 SocketAddressTestHelper::kGooglePublicDnsAAddrIPv4 :
171 SocketAddress addr(host, 65535);
173 socket->connect(&cb, addr, 1); // also set a ridiculously small timeout
177 ASSERT_EQ(cb.state, STATE_FAILED);
178 ASSERT_EQ(cb.exception.getType(), AsyncSocketException::TIMED_OUT);
180 // Verify that we can still get the peer address after a timeout.
181 // Use case is if the client was created from a client pool, and we want
182 // to log which peer failed.
183 folly::SocketAddress peer;
184 socket->getPeerAddress(&peer);
185 ASSERT_EQ(peer, addr);
186 EXPECT_LE(0, socket->getConnectTime().count());
187 EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(1));
191 * Test writing immediately after connecting, without waiting for connect
194 TEST_P(AsyncSocketConnectTest, ConnectAndWrite) {
199 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
201 if (GetParam() == TFOState::ENABLED) {
206 socket->connect(&ccb, server.getAddress(), 30);
210 memset(buf, 'a', sizeof(buf));
212 socket->write(&wcb, buf, sizeof(buf));
214 // Loop. We don't bother accepting on the server socket yet.
215 // The kernel should be able to buffer the write request so it can succeed.
218 ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
219 ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
221 // Make sure the server got a connection and received the data
223 server.verifyConnection(buf, sizeof(buf));
225 ASSERT_TRUE(socket->isClosedBySelf());
226 ASSERT_FALSE(socket->isClosedByPeer());
227 EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30));
231 * Test connecting using a nullptr connect callback.
233 TEST_P(AsyncSocketConnectTest, ConnectNullCallback) {
238 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
239 if (GetParam() == TFOState::ENABLED) {
243 socket->connect(nullptr, server.getAddress(), 30);
245 // write some data, just so we have some way of verifing
246 // that the socket works correctly after connecting
248 memset(buf, 'a', sizeof(buf));
250 socket->write(&wcb, buf, sizeof(buf));
254 ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
256 // Make sure the server got a connection and received the data
258 server.verifyConnection(buf, sizeof(buf));
260 ASSERT_TRUE(socket->isClosedBySelf());
261 ASSERT_FALSE(socket->isClosedByPeer());
265 * Test calling both write() and close() immediately after connecting, without
266 * waiting for connect to finish.
268 * This exercises the STATE_CONNECTING_CLOSING code.
270 TEST_P(AsyncSocketConnectTest, ConnectWriteAndClose) {
275 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
276 if (GetParam() == TFOState::ENABLED) {
280 socket->connect(&ccb, server.getAddress(), 30);
284 memset(buf, 'a', sizeof(buf));
286 socket->write(&wcb, buf, sizeof(buf));
291 // Loop. We don't bother accepting on the server socket yet.
292 // The kernel should be able to buffer the write request so it can succeed.
295 ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
296 ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
298 // Make sure the server got a connection and received the data
299 server.verifyConnection(buf, sizeof(buf));
301 ASSERT_TRUE(socket->isClosedBySelf());
302 ASSERT_FALSE(socket->isClosedByPeer());
306 * Test calling close() immediately after connect()
308 TEST(AsyncSocketTest, ConnectAndClose) {
311 // Connect using a AsyncSocket
313 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
315 socket->connect(&ccb, server.getAddress(), 30);
317 // Hopefully the connect didn't succeed immediately.
318 // If it did, we can't exercise the close-while-connecting code path.
319 if (ccb.state == STATE_SUCCEEDED) {
320 LOG(INFO) << "connect() succeeded immediately; aborting test "
321 "of close-during-connect behavior";
327 // Loop, although there shouldn't be anything to do.
330 // Make sure the connection was aborted
331 ASSERT_EQ(ccb.state, STATE_FAILED);
333 ASSERT_TRUE(socket->isClosedBySelf());
334 ASSERT_FALSE(socket->isClosedByPeer());
338 * Test calling closeNow() immediately after connect()
340 * This should be identical to the normal close behavior.
342 TEST(AsyncSocketTest, ConnectAndCloseNow) {
345 // Connect using a AsyncSocket
347 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
349 socket->connect(&ccb, server.getAddress(), 30);
351 // Hopefully the connect didn't succeed immediately.
352 // If it did, we can't exercise the close-while-connecting code path.
353 if (ccb.state == STATE_SUCCEEDED) {
354 LOG(INFO) << "connect() succeeded immediately; aborting test "
355 "of closeNow()-during-connect behavior";
361 // Loop, although there shouldn't be anything to do.
364 // Make sure the connection was aborted
365 ASSERT_EQ(ccb.state, STATE_FAILED);
367 ASSERT_TRUE(socket->isClosedBySelf());
368 ASSERT_FALSE(socket->isClosedByPeer());
372 * Test calling both write() and closeNow() immediately after connecting,
373 * without waiting for connect to finish.
375 * This should abort the pending write.
377 TEST(AsyncSocketTest, ConnectWriteAndCloseNow) {
382 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
384 socket->connect(&ccb, server.getAddress(), 30);
386 // Hopefully the connect didn't succeed immediately.
387 // If it did, we can't exercise the close-while-connecting code path.
388 if (ccb.state == STATE_SUCCEEDED) {
389 LOG(INFO) << "connect() succeeded immediately; aborting test "
390 "of write-during-connect behavior";
396 memset(buf, 'a', sizeof(buf));
398 socket->write(&wcb, buf, sizeof(buf));
403 // Loop, although there shouldn't be anything to do.
406 ASSERT_EQ(ccb.state, STATE_FAILED);
407 ASSERT_EQ(wcb.state, STATE_FAILED);
409 ASSERT_TRUE(socket->isClosedBySelf());
410 ASSERT_FALSE(socket->isClosedByPeer());
414 * Test installing a read callback immediately, before connect() finishes.
416 TEST_P(AsyncSocketConnectTest, ConnectAndRead) {
421 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
422 if (GetParam() == TFOState::ENABLED) {
427 socket->connect(&ccb, server.getAddress(), 30);
430 socket->setReadCB(&rcb);
432 if (GetParam() == TFOState::ENABLED) {
433 // Trigger a connection
434 socket->writeChain(nullptr, IOBuf::copyBuffer("hey"));
437 // Even though we haven't looped yet, we should be able to accept
438 // the connection and send data to it.
439 std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
441 memset(buf, 'a', sizeof(buf));
442 acceptedSocket->write(buf, sizeof(buf));
443 acceptedSocket->flush();
444 acceptedSocket->close();
446 // Loop, although there shouldn't be anything to do.
449 ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
450 ASSERT_EQ(rcb.buffers.size(), 1);
451 ASSERT_EQ(rcb.buffers[0].length, sizeof(buf));
452 ASSERT_EQ(memcmp(rcb.buffers[0].buffer, buf, sizeof(buf)), 0);
454 ASSERT_FALSE(socket->isClosedBySelf());
455 ASSERT_FALSE(socket->isClosedByPeer());
459 * Test installing a read callback and then closing immediately before the
460 * connect attempt finishes.
462 TEST(AsyncSocketTest, ConnectReadAndClose) {
467 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
469 socket->connect(&ccb, server.getAddress(), 30);
471 // Hopefully the connect didn't succeed immediately.
472 // If it did, we can't exercise the close-while-connecting code path.
473 if (ccb.state == STATE_SUCCEEDED) {
474 LOG(INFO) << "connect() succeeded immediately; aborting test "
475 "of read-during-connect behavior";
480 socket->setReadCB(&rcb);
485 // Loop, although there shouldn't be anything to do.
488 ASSERT_EQ(ccb.state, STATE_FAILED); // we aborted the close attempt
489 ASSERT_EQ(rcb.buffers.size(), 0);
490 ASSERT_EQ(rcb.state, STATE_SUCCEEDED); // this indicates EOF
492 ASSERT_TRUE(socket->isClosedBySelf());
493 ASSERT_FALSE(socket->isClosedByPeer());
497 * Test both writing and installing a read callback immediately,
498 * before connect() finishes.
500 TEST_P(AsyncSocketConnectTest, ConnectWriteAndRead) {
505 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
506 if (GetParam() == TFOState::ENABLED) {
510 socket->connect(&ccb, server.getAddress(), 30);
514 memset(buf1, 'a', sizeof(buf1));
516 socket->write(&wcb, buf1, sizeof(buf1));
518 // set a read callback
520 socket->setReadCB(&rcb);
522 // Even though we haven't looped yet, we should be able to accept
523 // the connection and send data to it.
524 std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
526 memset(buf2, 'b', sizeof(buf2));
527 acceptedSocket->write(buf2, sizeof(buf2));
528 acceptedSocket->flush();
530 // shut down the write half of acceptedSocket, so that the AsyncSocket
531 // will stop reading and we can break out of the event loop.
532 shutdown(acceptedSocket->getSocketFD(), SHUT_WR);
537 // Make sure the connect succeeded
538 ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
540 // Make sure the AsyncSocket read the data written by the accepted socket
541 ASSERT_EQ(rcb.state, STATE_SUCCEEDED);
542 ASSERT_EQ(rcb.buffers.size(), 1);
543 ASSERT_EQ(rcb.buffers[0].length, sizeof(buf2));
544 ASSERT_EQ(memcmp(rcb.buffers[0].buffer, buf2, sizeof(buf2)), 0);
546 // Close the AsyncSocket so we'll see EOF on acceptedSocket
549 // Make sure the accepted socket saw the data written by the AsyncSocket
550 uint8_t readbuf[sizeof(buf1)];
551 acceptedSocket->readAll(readbuf, sizeof(readbuf));
552 ASSERT_EQ(memcmp(buf1, readbuf, sizeof(buf1)), 0);
553 uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
554 ASSERT_EQ(bytesRead, 0);
556 ASSERT_FALSE(socket->isClosedBySelf());
557 ASSERT_TRUE(socket->isClosedByPeer());
561 * Test writing to the socket then shutting down writes before the connect
564 TEST(AsyncSocketTest, ConnectWriteAndShutdownWrite) {
569 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
571 socket->connect(&ccb, server.getAddress(), 30);
573 // Hopefully the connect didn't succeed immediately.
574 // If it did, we can't exercise the write-while-connecting code path.
575 if (ccb.state == STATE_SUCCEEDED) {
576 LOG(INFO) << "connect() succeeded immediately; skipping test";
580 // Ask to write some data
582 memset(wbuf, 'a', sizeof(wbuf));
584 socket->write(&wcb, wbuf, sizeof(wbuf));
585 socket->shutdownWrite();
588 socket->shutdownWrite();
590 // Even though we haven't looped yet, we should be able to accept
592 std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
594 // Since the connection is still in progress, there should be no data to
595 // read yet. Verify that the accepted socket is not readable.
596 struct pollfd fds[1];
597 fds[0].fd = acceptedSocket->getSocketFD();
598 fds[0].events = POLLIN;
600 int rc = poll(fds, 1, 0);
603 // Write data to the accepted socket
604 uint8_t acceptedWbuf[192];
605 memset(acceptedWbuf, 'b', sizeof(acceptedWbuf));
606 acceptedSocket->write(acceptedWbuf, sizeof(acceptedWbuf));
607 acceptedSocket->flush();
612 // The loop should have completed the connection, written the queued data,
613 // and shutdown writes on the socket.
615 // Check that the connection was completed successfully and that the write
616 // callback succeeded.
617 ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
618 ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
620 // Check that we can read the data that was written to the socket, and that
621 // we see an EOF, since its socket was half-shutdown.
622 uint8_t readbuf[sizeof(wbuf)];
623 acceptedSocket->readAll(readbuf, sizeof(readbuf));
624 ASSERT_EQ(memcmp(wbuf, readbuf, sizeof(wbuf)), 0);
625 uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
626 ASSERT_EQ(bytesRead, 0);
628 // Close the accepted socket. This will cause it to see EOF
629 // and uninstall the read callback when we loop next.
630 acceptedSocket->close();
632 // Install a read callback, then loop again.
634 socket->setReadCB(&rcb);
637 // This loop should have read the data and seen the EOF
638 ASSERT_EQ(rcb.state, STATE_SUCCEEDED);
639 ASSERT_EQ(rcb.buffers.size(), 1);
640 ASSERT_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf));
641 ASSERT_EQ(memcmp(rcb.buffers[0].buffer,
642 acceptedWbuf, sizeof(acceptedWbuf)), 0);
644 ASSERT_FALSE(socket->isClosedBySelf());
645 ASSERT_FALSE(socket->isClosedByPeer());
649 * Test reading, writing, and shutting down writes before the connect attempt
652 TEST(AsyncSocketTest, ConnectReadWriteAndShutdownWrite) {
657 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
659 socket->connect(&ccb, server.getAddress(), 30);
661 // Hopefully the connect didn't succeed immediately.
662 // If it did, we can't exercise the write-while-connecting code path.
663 if (ccb.state == STATE_SUCCEEDED) {
664 LOG(INFO) << "connect() succeeded immediately; skipping test";
668 // Install a read callback
670 socket->setReadCB(&rcb);
672 // Ask to write some data
674 memset(wbuf, 'a', sizeof(wbuf));
676 socket->write(&wcb, wbuf, sizeof(wbuf));
679 socket->shutdownWrite();
681 // Even though we haven't looped yet, we should be able to accept
683 std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
685 // Since the connection is still in progress, there should be no data to
686 // read yet. Verify that the accepted socket is not readable.
687 struct pollfd fds[1];
688 fds[0].fd = acceptedSocket->getSocketFD();
689 fds[0].events = POLLIN;
691 int rc = poll(fds, 1, 0);
694 // Write data to the accepted socket
695 uint8_t acceptedWbuf[192];
696 memset(acceptedWbuf, 'b', sizeof(acceptedWbuf));
697 acceptedSocket->write(acceptedWbuf, sizeof(acceptedWbuf));
698 acceptedSocket->flush();
699 // Shutdown writes to the accepted socket. This will cause it to see EOF
700 // and uninstall the read callback.
701 shutdown(acceptedSocket->getSocketFD(), SHUT_WR);
706 // The loop should have completed the connection, written the queued data,
707 // shutdown writes on the socket, read the data we wrote to it, and see the
710 // Check that the connection was completed successfully and that the read
711 // and write callbacks were invoked as expected.
712 ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
713 ASSERT_EQ(rcb.state, STATE_SUCCEEDED);
714 ASSERT_EQ(rcb.buffers.size(), 1);
715 ASSERT_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf));
716 ASSERT_EQ(memcmp(rcb.buffers[0].buffer,
717 acceptedWbuf, sizeof(acceptedWbuf)), 0);
718 ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
720 // Check that we can read the data that was written to the socket, and that
721 // we see an EOF, since its socket was half-shutdown.
722 uint8_t readbuf[sizeof(wbuf)];
723 acceptedSocket->readAll(readbuf, sizeof(readbuf));
724 ASSERT_EQ(memcmp(wbuf, readbuf, sizeof(wbuf)), 0);
725 uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
726 ASSERT_EQ(bytesRead, 0);
728 // Fully close both sockets
729 acceptedSocket->close();
732 ASSERT_FALSE(socket->isClosedBySelf());
733 ASSERT_TRUE(socket->isClosedByPeer());
737 * Test reading, writing, and calling shutdownWriteNow() before the
738 * connect attempt finishes.
740 TEST(AsyncSocketTest, ConnectReadWriteAndShutdownWriteNow) {
745 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
747 socket->connect(&ccb, server.getAddress(), 30);
749 // Hopefully the connect didn't succeed immediately.
750 // If it did, we can't exercise the write-while-connecting code path.
751 if (ccb.state == STATE_SUCCEEDED) {
752 LOG(INFO) << "connect() succeeded immediately; skipping test";
756 // Install a read callback
758 socket->setReadCB(&rcb);
760 // Ask to write some data
762 memset(wbuf, 'a', sizeof(wbuf));
764 socket->write(&wcb, wbuf, sizeof(wbuf));
766 // Shutdown writes immediately.
767 // This should immediately discard the data that we just tried to write.
768 socket->shutdownWriteNow();
770 // Verify that writeError() was invoked on the write callback.
771 ASSERT_EQ(wcb.state, STATE_FAILED);
772 ASSERT_EQ(wcb.bytesWritten, 0);
774 // Even though we haven't looped yet, we should be able to accept
776 std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
778 // Since the connection is still in progress, there should be no data to
779 // read yet. Verify that the accepted socket is not readable.
780 struct pollfd fds[1];
781 fds[0].fd = acceptedSocket->getSocketFD();
782 fds[0].events = POLLIN;
784 int rc = poll(fds, 1, 0);
787 // Write data to the accepted socket
788 uint8_t acceptedWbuf[192];
789 memset(acceptedWbuf, 'b', sizeof(acceptedWbuf));
790 acceptedSocket->write(acceptedWbuf, sizeof(acceptedWbuf));
791 acceptedSocket->flush();
792 // Shutdown writes to the accepted socket. This will cause it to see EOF
793 // and uninstall the read callback.
794 shutdown(acceptedSocket->getSocketFD(), SHUT_WR);
799 // The loop should have completed the connection, written the queued data,
800 // shutdown writes on the socket, read the data we wrote to it, and see the
803 // Check that the connection was completed successfully and that the read
804 // callback was invoked as expected.
805 ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
806 ASSERT_EQ(rcb.state, STATE_SUCCEEDED);
807 ASSERT_EQ(rcb.buffers.size(), 1);
808 ASSERT_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf));
809 ASSERT_EQ(memcmp(rcb.buffers[0].buffer,
810 acceptedWbuf, sizeof(acceptedWbuf)), 0);
812 // Since we used shutdownWriteNow(), it should have discarded all pending
813 // write data. Verify we see an immediate EOF when reading from the accepted
815 uint8_t readbuf[sizeof(wbuf)];
816 uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
817 ASSERT_EQ(bytesRead, 0);
819 // Fully close both sockets
820 acceptedSocket->close();
823 ASSERT_FALSE(socket->isClosedBySelf());
824 ASSERT_TRUE(socket->isClosedByPeer());
827 // Helper function for use in testConnectOptWrite()
828 // Temporarily disable the read callback
829 void tmpDisableReads(AsyncSocket* socket, ReadCallback* rcb) {
830 // Uninstall the read callback
831 socket->setReadCB(nullptr);
832 // Schedule the read callback to be reinstalled after 1ms
833 socket->getEventBase()->runInLoop(
834 std::bind(&AsyncSocket::setReadCB, socket, rcb));
838 * Test connect+write, then have the connect callback perform another write.
840 * This tests interaction of the optimistic writing after connect with
841 * additional write attempts that occur in the connect callback.
843 void testConnectOptWrite(size_t size1, size_t size2, bool close = false) {
846 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
850 socket->connect(&ccb, server.getAddress(), 30);
852 // Hopefully the connect didn't succeed immediately.
853 // If it did, we can't exercise the optimistic write code path.
854 if (ccb.state == STATE_SUCCEEDED) {
855 LOG(INFO) << "connect() succeeded immediately; aborting test "
856 "of optimistic write behavior";
860 // Tell the connect callback to perform a write when the connect succeeds
862 scoped_array<char> buf2(new char[size2]);
863 memset(buf2.get(), 'b', size2);
865 ccb.successCallback = [&] { socket->write(&wcb2, buf2.get(), size2); };
866 // Tell the second write callback to close the connection when it is done
867 wcb2.successCallback = [&] { socket->closeNow(); };
870 // Schedule one write() immediately, before the connect finishes
871 scoped_array<char> buf1(new char[size1]);
872 memset(buf1.get(), 'a', size1);
875 socket->write(&wcb1, buf1.get(), size1);
879 // immediately perform a close, before connect() completes
883 // Start reading from the other endpoint after 10ms.
884 // If we're using large buffers, we have to read so that the writes don't
886 std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&evb);
888 rcb.dataAvailableCallback = std::bind(tmpDisableReads,
889 acceptedSocket.get(), &rcb);
890 socket->getEventBase()->tryRunAfterDelay(
891 std::bind(&AsyncSocket::setReadCB, acceptedSocket.get(), &rcb),
894 // Loop. We don't bother accepting on the server socket yet.
895 // The kernel should be able to buffer the write request so it can succeed.
898 ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
900 ASSERT_EQ(wcb1.state, STATE_SUCCEEDED);
903 ASSERT_EQ(wcb2.state, STATE_SUCCEEDED);
908 // Make sure the read callback received all of the data
909 size_t bytesRead = 0;
910 for (vector<ReadCallback::Buffer>::const_iterator it = rcb.buffers.begin();
911 it != rcb.buffers.end();
913 size_t start = bytesRead;
914 bytesRead += it->length;
915 size_t end = bytesRead;
917 size_t cmpLen = min(size1, end) - start;
918 ASSERT_EQ(memcmp(it->buffer, buf1.get() + start, cmpLen), 0);
920 if (end > size1 && end <= size1 + size2) {
924 if (start >= size1) {
926 buf2Offset = start - size1;
927 cmpLen = end - start;
929 itOffset = size1 - start;
931 cmpLen = end - size1;
933 ASSERT_EQ(memcmp(it->buffer + itOffset, buf2.get() + buf2Offset,
938 ASSERT_EQ(bytesRead, size1 + size2);
941 TEST(AsyncSocketTest, ConnectCallbackWrite) {
942 // Test using small writes that should both succeed immediately
943 testConnectOptWrite(100, 200);
945 // Test using a large buffer in the connect callback, that should block
946 const size_t largeSize = 8*1024*1024;
947 testConnectOptWrite(100, largeSize);
949 // Test using a large initial write
950 testConnectOptWrite(largeSize, 100);
952 // Test using two large buffers
953 testConnectOptWrite(largeSize, largeSize);
955 // Test a small write in the connect callback,
956 // but no immediate write before connect completes
957 testConnectOptWrite(0, 64);
959 // Test a large write in the connect callback,
960 // but no immediate write before connect completes
961 testConnectOptWrite(0, largeSize);
963 // Test connect, a small write, then immediately call close() before connect
965 testConnectOptWrite(211, 0, true);
967 // Test connect, a large immediate write (that will block), then immediately
968 // call close() before connect completes
969 testConnectOptWrite(largeSize, 0, true);
972 ///////////////////////////////////////////////////////////////////////////
973 // write() related tests
974 ///////////////////////////////////////////////////////////////////////////
977 * Test writing using a nullptr callback
979 TEST(AsyncSocketTest, WriteNullCallback) {
984 std::shared_ptr<AsyncSocket> socket =
985 AsyncSocket::newSocket(&evb, server.getAddress(), 30);
986 evb.loop(); // loop until the socket is connected
988 // write() with a nullptr callback
990 memset(buf, 'a', sizeof(buf));
991 socket->write(nullptr, buf, sizeof(buf));
993 evb.loop(); // loop until the data is sent
995 // Make sure the server got a connection and received the data
997 server.verifyConnection(buf, sizeof(buf));
999 ASSERT_TRUE(socket->isClosedBySelf());
1000 ASSERT_FALSE(socket->isClosedByPeer());
1004 * Test writing with a send timeout
1006 TEST(AsyncSocketTest, WriteTimeout) {
1011 std::shared_ptr<AsyncSocket> socket =
1012 AsyncSocket::newSocket(&evb, server.getAddress(), 30);
1013 evb.loop(); // loop until the socket is connected
1015 // write() a large chunk of data, with no-one on the other end reading
1016 size_t writeLength = 8*1024*1024;
1017 uint32_t timeout = 200;
1018 socket->setSendTimeout(timeout);
1019 scoped_array<char> buf(new char[writeLength]);
1020 memset(buf.get(), 'a', writeLength);
1022 socket->write(&wcb, buf.get(), writeLength);
1028 // Make sure the write attempt timed out as requested
1029 ASSERT_EQ(wcb.state, STATE_FAILED);
1030 ASSERT_EQ(wcb.exception.getType(), AsyncSocketException::TIMED_OUT);
1032 // Check that the write timed out within a reasonable period of time.
1033 // We don't check for exactly the specified timeout, since AsyncSocket only
1034 // times out when it hasn't made progress for that period of time.
1036 // On linux, the first write sends a few hundred kb of data, then blocks for
1037 // writability, and then unblocks again after 40ms and is able to write
1038 // another smaller of data before blocking permanently. Therefore it doesn't
1039 // time out until 40ms + timeout.
1041 // I haven't fully verified the cause of this, but I believe it probably
1042 // occurs because the receiving end delays sending an ack for up to 40ms.
1043 // (This is the default value for TCP_DELACK_MIN.) Once the sender receives
1044 // the ack, it can send some more data. However, after that point the
1045 // receiver's kernel buffer is full. This 40ms delay happens even with
1046 // TCP_NODELAY and TCP_QUICKACK enabled on both endpoints. However, the
1047 // kernel may be automatically disabling TCP_QUICKACK after receiving some
1050 // For now, we simply check that the timeout occurred within 160ms of
1051 // the requested value.
1052 T_CHECK_TIMEOUT(start, end, milliseconds(timeout), milliseconds(160));
1056 * Test writing to a socket that the remote endpoint has closed
1058 TEST(AsyncSocketTest, WritePipeError) {
1063 std::shared_ptr<AsyncSocket> socket =
1064 AsyncSocket::newSocket(&evb, server.getAddress(), 30);
1065 socket->setSendTimeout(1000);
1066 evb.loop(); // loop until the socket is connected
1068 // accept and immediately close the socket
1069 std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
1070 acceptedSocket.reset();
1072 // write() a large chunk of data
1073 size_t writeLength = 8*1024*1024;
1074 scoped_array<char> buf(new char[writeLength]);
1075 memset(buf.get(), 'a', writeLength);
1077 socket->write(&wcb, buf.get(), writeLength);
1081 // Make sure the write failed.
1082 // It would be nice if AsyncSocketException could convey the errno value,
1083 // so that we could check for EPIPE
1084 ASSERT_EQ(wcb.state, STATE_FAILED);
1085 ASSERT_EQ(wcb.exception.getType(),
1086 AsyncSocketException::INTERNAL_ERROR);
1088 ASSERT_FALSE(socket->isClosedBySelf());
1089 ASSERT_FALSE(socket->isClosedByPeer());
1093 * Test writing a mix of simple buffers and IOBufs
1095 TEST(AsyncSocketTest, WriteIOBuf) {
1100 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
1102 socket->connect(&ccb, server.getAddress(), 30);
1104 // Accept the connection
1105 std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&evb);
1107 acceptedSocket->setReadCB(&rcb);
1109 // Write a simple buffer to the socket
1110 constexpr size_t simpleBufLength = 5;
1111 char simpleBuf[simpleBufLength];
1112 memset(simpleBuf, 'a', simpleBufLength);
1114 socket->write(&wcb, simpleBuf, simpleBufLength);
1116 // Write a single-element IOBuf chain
1117 size_t buf1Length = 7;
1118 unique_ptr<IOBuf> buf1(IOBuf::create(buf1Length));
1119 memset(buf1->writableData(), 'b', buf1Length);
1120 buf1->append(buf1Length);
1121 unique_ptr<IOBuf> buf1Copy(buf1->clone());
1123 socket->writeChain(&wcb2, std::move(buf1));
1125 // Write a multiple-element IOBuf chain
1126 size_t buf2Length = 11;
1127 unique_ptr<IOBuf> buf2(IOBuf::create(buf2Length));
1128 memset(buf2->writableData(), 'c', buf2Length);
1129 buf2->append(buf2Length);
1130 size_t buf3Length = 13;
1131 unique_ptr<IOBuf> buf3(IOBuf::create(buf3Length));
1132 memset(buf3->writableData(), 'd', buf3Length);
1133 buf3->append(buf3Length);
1134 buf2->appendChain(std::move(buf3));
1135 unique_ptr<IOBuf> buf2Copy(buf2->clone());
1136 buf2Copy->coalesce();
1138 socket->writeChain(&wcb3, std::move(buf2));
1139 socket->shutdownWrite();
1141 // Let the reads and writes run to completion
1144 ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
1145 ASSERT_EQ(wcb2.state, STATE_SUCCEEDED);
1146 ASSERT_EQ(wcb3.state, STATE_SUCCEEDED);
1148 // Make sure the reader got the right data in the right order
1149 ASSERT_EQ(rcb.state, STATE_SUCCEEDED);
1150 ASSERT_EQ(rcb.buffers.size(), 1);
1151 ASSERT_EQ(rcb.buffers[0].length,
1152 simpleBufLength + buf1Length + buf2Length + buf3Length);
1154 memcmp(rcb.buffers[0].buffer, simpleBuf, simpleBufLength), 0);
1156 memcmp(rcb.buffers[0].buffer + simpleBufLength,
1157 buf1Copy->data(), buf1Copy->length()), 0);
1159 memcmp(rcb.buffers[0].buffer + simpleBufLength + buf1Length,
1160 buf2Copy->data(), buf2Copy->length()), 0);
1162 acceptedSocket->close();
1165 ASSERT_TRUE(socket->isClosedBySelf());
1166 ASSERT_FALSE(socket->isClosedByPeer());
1169 TEST(AsyncSocketTest, WriteIOBufCorked) {
1174 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
1176 socket->connect(&ccb, server.getAddress(), 30);
1178 // Accept the connection
1179 std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&evb);
1181 acceptedSocket->setReadCB(&rcb);
1183 // Do three writes, 100ms apart, with the "cork" flag set
1184 // on the second write. The reader should see the first write
1185 // arrive by itself, followed by the second and third writes
1186 // arriving together.
1187 size_t buf1Length = 5;
1188 unique_ptr<IOBuf> buf1(IOBuf::create(buf1Length));
1189 memset(buf1->writableData(), 'a', buf1Length);
1190 buf1->append(buf1Length);
1191 size_t buf2Length = 7;
1192 unique_ptr<IOBuf> buf2(IOBuf::create(buf2Length));
1193 memset(buf2->writableData(), 'b', buf2Length);
1194 buf2->append(buf2Length);
1195 size_t buf3Length = 11;
1196 unique_ptr<IOBuf> buf3(IOBuf::create(buf3Length));
1197 memset(buf3->writableData(), 'c', buf3Length);
1198 buf3->append(buf3Length);
1200 socket->writeChain(&wcb1, std::move(buf1));
1202 DelayedWrite write2(socket, std::move(buf2), &wcb2, true);
1203 write2.scheduleTimeout(100);
1205 DelayedWrite write3(socket, std::move(buf3), &wcb3, false, true);
1206 write3.scheduleTimeout(140);
1209 ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
1210 ASSERT_EQ(wcb1.state, STATE_SUCCEEDED);
1211 ASSERT_EQ(wcb2.state, STATE_SUCCEEDED);
1212 if (wcb3.state != STATE_SUCCEEDED) {
1213 throw(wcb3.exception);
1215 ASSERT_EQ(wcb3.state, STATE_SUCCEEDED);
1217 // Make sure the reader got the data with the right grouping
1218 ASSERT_EQ(rcb.state, STATE_SUCCEEDED);
1219 ASSERT_EQ(rcb.buffers.size(), 2);
1220 ASSERT_EQ(rcb.buffers[0].length, buf1Length);
1221 ASSERT_EQ(rcb.buffers[1].length, buf2Length + buf3Length);
1223 acceptedSocket->close();
1226 ASSERT_TRUE(socket->isClosedBySelf());
1227 ASSERT_FALSE(socket->isClosedByPeer());
1231 * Test performing a zero-length write
1233 TEST(AsyncSocketTest, ZeroLengthWrite) {
1238 std::shared_ptr<AsyncSocket> socket =
1239 AsyncSocket::newSocket(&evb, server.getAddress(), 30);
1240 evb.loop(); // loop until the socket is connected
1242 auto acceptedSocket = server.acceptAsync(&evb);
1244 acceptedSocket->setReadCB(&rcb);
1246 size_t len1 = 1024*1024;
1247 size_t len2 = 1024*1024;
1248 std::unique_ptr<char[]> buf(new char[len1 + len2]);
1249 memset(buf.get(), 'a', len1);
1250 memset(buf.get(), 'b', len2);
1256 socket->write(&wcb1, buf.get(), 0);
1257 socket->write(&wcb2, buf.get(), len1);
1258 socket->write(&wcb3, buf.get() + len1, 0);
1259 socket->write(&wcb4, buf.get() + len1, len2);
1262 evb.loop(); // loop until the data is sent
1264 ASSERT_EQ(wcb1.state, STATE_SUCCEEDED);
1265 ASSERT_EQ(wcb2.state, STATE_SUCCEEDED);
1266 ASSERT_EQ(wcb3.state, STATE_SUCCEEDED);
1267 ASSERT_EQ(wcb4.state, STATE_SUCCEEDED);
1268 rcb.verifyData(buf.get(), len1 + len2);
1270 ASSERT_TRUE(socket->isClosedBySelf());
1271 ASSERT_FALSE(socket->isClosedByPeer());
1274 TEST(AsyncSocketTest, ZeroLengthWritev) {
1279 std::shared_ptr<AsyncSocket> socket =
1280 AsyncSocket::newSocket(&evb, server.getAddress(), 30);
1281 evb.loop(); // loop until the socket is connected
1283 auto acceptedSocket = server.acceptAsync(&evb);
1285 acceptedSocket->setReadCB(&rcb);
1287 size_t len1 = 1024*1024;
1288 size_t len2 = 1024*1024;
1289 std::unique_ptr<char[]> buf(new char[len1 + len2]);
1290 memset(buf.get(), 'a', len1);
1291 memset(buf.get(), 'b', len2);
1294 constexpr size_t iovCount = 4;
1295 struct iovec iov[iovCount];
1296 iov[0].iov_base = buf.get();
1297 iov[0].iov_len = len1;
1298 iov[1].iov_base = buf.get() + len1;
1300 iov[2].iov_base = buf.get() + len1;
1301 iov[2].iov_len = len2;
1302 iov[3].iov_base = buf.get() + len1 + len2;
1305 socket->writev(&wcb, iov, iovCount);
1307 evb.loop(); // loop until the data is sent
1309 ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
1310 rcb.verifyData(buf.get(), len1 + len2);
1312 ASSERT_TRUE(socket->isClosedBySelf());
1313 ASSERT_FALSE(socket->isClosedByPeer());
1316 ///////////////////////////////////////////////////////////////////////////
1317 // close() related tests
1318 ///////////////////////////////////////////////////////////////////////////
1321 * Test calling close() with pending writes when the socket is already closing.
1323 TEST(AsyncSocketTest, ClosePendingWritesWhileClosing) {
1328 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
1330 socket->connect(&ccb, server.getAddress(), 30);
1332 // accept the socket on the server side
1333 std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
1335 // Loop to ensure the connect has completed
1338 // Make sure we are connected
1339 ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
1341 // Schedule pending writes, until several write attempts have blocked
1343 memset(buf, 'a', sizeof(buf));
1344 typedef vector< std::shared_ptr<WriteCallback> > WriteCallbackVector;
1345 WriteCallbackVector writeCallbacks;
1347 writeCallbacks.reserve(5);
1348 while (writeCallbacks.size() < 5) {
1349 std::shared_ptr<WriteCallback> wcb(new WriteCallback);
1351 socket->write(wcb.get(), buf, sizeof(buf));
1352 if (wcb->state == STATE_SUCCEEDED) {
1353 // Succeeded immediately. Keep performing more writes
1357 // This write is blocked.
1358 // Have the write callback call close() when writeError() is invoked
1359 wcb->errorCallback = std::bind(&AsyncSocket::close, socket.get());
1360 writeCallbacks.push_back(wcb);
1363 // Call closeNow() to immediately fail the pending writes
1366 // Make sure writeError() was invoked on all of the pending write callbacks
1367 for (WriteCallbackVector::const_iterator it = writeCallbacks.begin();
1368 it != writeCallbacks.end();
1370 ASSERT_EQ((*it)->state, STATE_FAILED);
1373 ASSERT_TRUE(socket->isClosedBySelf());
1374 ASSERT_FALSE(socket->isClosedByPeer());
1377 ///////////////////////////////////////////////////////////////////////////
1378 // ImmediateRead related tests
1379 ///////////////////////////////////////////////////////////////////////////
1381 /* AsyncSocket use to verify immediate read works */
1382 class AsyncSocketImmediateRead : public folly::AsyncSocket {
1384 bool immediateReadCalled = false;
1385 explicit AsyncSocketImmediateRead(folly::EventBase* evb) : AsyncSocket(evb) {}
1387 void checkForImmediateRead() noexcept override {
1388 immediateReadCalled = true;
1389 AsyncSocket::handleRead();
1393 TEST(AsyncSocket, ConnectReadImmediateRead) {
1396 const size_t maxBufferSz = 100;
1397 const size_t maxReadsPerEvent = 1;
1398 const size_t expectedDataSz = maxBufferSz * 3;
1399 char expectedData[expectedDataSz];
1400 memset(expectedData, 'j', expectedDataSz);
1403 ReadCallback rcb(maxBufferSz);
1404 AsyncSocketImmediateRead socket(&evb);
1405 socket.connect(nullptr, server.getAddress(), 30);
1407 evb.loop(); // loop until the socket is connected
1409 socket.setReadCB(&rcb);
1410 socket.setMaxReadsPerEvent(maxReadsPerEvent);
1411 socket.immediateReadCalled = false;
1413 auto acceptedSocket = server.acceptAsync(&evb);
1415 ReadCallback rcbServer;
1416 WriteCallback wcbServer;
1417 rcbServer.dataAvailableCallback = [&]() {
1418 if (rcbServer.dataRead() == expectedDataSz) {
1419 // write back all data read
1420 rcbServer.verifyData(expectedData, expectedDataSz);
1421 acceptedSocket->write(&wcbServer, expectedData, expectedDataSz);
1422 acceptedSocket->close();
1425 acceptedSocket->setReadCB(&rcbServer);
1429 socket.write(&wcb1, expectedData, expectedDataSz);
1431 ASSERT_EQ(wcb1.state, STATE_SUCCEEDED);
1432 rcb.verifyData(expectedData, expectedDataSz);
1433 ASSERT_EQ(socket.immediateReadCalled, true);
1435 ASSERT_FALSE(socket.isClosedBySelf());
1436 ASSERT_FALSE(socket.isClosedByPeer());
1439 TEST(AsyncSocket, ConnectReadUninstallRead) {
1442 const size_t maxBufferSz = 100;
1443 const size_t maxReadsPerEvent = 1;
1444 const size_t expectedDataSz = maxBufferSz * 3;
1445 char expectedData[expectedDataSz];
1446 memset(expectedData, 'k', expectedDataSz);
1449 ReadCallback rcb(maxBufferSz);
1450 AsyncSocketImmediateRead socket(&evb);
1451 socket.connect(nullptr, server.getAddress(), 30);
1453 evb.loop(); // loop until the socket is connected
1455 socket.setReadCB(&rcb);
1456 socket.setMaxReadsPerEvent(maxReadsPerEvent);
1457 socket.immediateReadCalled = false;
1459 auto acceptedSocket = server.acceptAsync(&evb);
1461 ReadCallback rcbServer;
1462 WriteCallback wcbServer;
1463 rcbServer.dataAvailableCallback = [&]() {
1464 if (rcbServer.dataRead() == expectedDataSz) {
1465 // write back all data read
1466 rcbServer.verifyData(expectedData, expectedDataSz);
1467 acceptedSocket->write(&wcbServer, expectedData, expectedDataSz);
1468 acceptedSocket->close();
1471 acceptedSocket->setReadCB(&rcbServer);
1473 rcb.dataAvailableCallback = [&]() {
1474 // we read data and reset readCB
1475 socket.setReadCB(nullptr);
1480 socket.write(&wcb, expectedData, expectedDataSz);
1482 ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
1484 /* we shoud've only read maxBufferSz data since readCallback_
1485 * was reset in dataAvailableCallback */
1486 ASSERT_EQ(rcb.dataRead(), maxBufferSz);
1487 ASSERT_EQ(socket.immediateReadCalled, false);
1489 ASSERT_FALSE(socket.isClosedBySelf());
1490 ASSERT_FALSE(socket.isClosedByPeer());
1494 // - Test connect() and have the connect callback set the read callback
1495 // - Test connect() and have the connect callback unset the read callback
1496 // - Test reading/writing/closing/destroying the socket in the connect callback
1497 // - Test reading/writing/closing/destroying the socket in the read callback
1498 // - Test reading/writing/closing/destroying the socket in the write callback
1499 // - Test one-way shutdown behavior
1500 // - Test changing the EventBase
1502 // - TODO: test multiple threads sharing a AsyncSocket, and detaching from it
1503 // in connectSuccess(), readDataAvailable(), writeSuccess()
1506 ///////////////////////////////////////////////////////////////////////////
1507 // AsyncServerSocket tests
1508 ///////////////////////////////////////////////////////////////////////////
1511 * Helper ConnectionEventCallback class for the test code.
1512 * It maintains counters protected by a spin lock.
1514 class TestConnectionEventCallback :
1515 public AsyncServerSocket::ConnectionEventCallback {
1517 virtual void onConnectionAccepted(
1518 const int /* socket */,
1519 const SocketAddress& /* addr */) noexcept override {
1520 folly::RWSpinLock::WriteHolder holder(spinLock_);
1521 connectionAccepted_++;
1524 virtual void onConnectionAcceptError(const int /* err */) noexcept override {
1525 folly::RWSpinLock::WriteHolder holder(spinLock_);
1526 connectionAcceptedError_++;
1529 virtual void onConnectionDropped(
1530 const int /* socket */,
1531 const SocketAddress& /* addr */) noexcept override {
1532 folly::RWSpinLock::WriteHolder holder(spinLock_);
1533 connectionDropped_++;
1536 virtual void onConnectionEnqueuedForAcceptorCallback(
1537 const int /* socket */,
1538 const SocketAddress& /* addr */) noexcept override {
1539 folly::RWSpinLock::WriteHolder holder(spinLock_);
1540 connectionEnqueuedForAcceptCallback_++;
1543 virtual void onConnectionDequeuedByAcceptorCallback(
1544 const int /* socket */,
1545 const SocketAddress& /* addr */) noexcept override {
1546 folly::RWSpinLock::WriteHolder holder(spinLock_);
1547 connectionDequeuedByAcceptCallback_++;
1550 virtual void onBackoffStarted() noexcept override {
1551 folly::RWSpinLock::WriteHolder holder(spinLock_);
1555 virtual void onBackoffEnded() noexcept override {
1556 folly::RWSpinLock::WriteHolder holder(spinLock_);
1560 virtual void onBackoffError() noexcept override {
1561 folly::RWSpinLock::WriteHolder holder(spinLock_);
1565 unsigned int getConnectionAccepted() const {
1566 folly::RWSpinLock::ReadHolder holder(spinLock_);
1567 return connectionAccepted_;
1570 unsigned int getConnectionAcceptedError() const {
1571 folly::RWSpinLock::ReadHolder holder(spinLock_);
1572 return connectionAcceptedError_;
1575 unsigned int getConnectionDropped() const {
1576 folly::RWSpinLock::ReadHolder holder(spinLock_);
1577 return connectionDropped_;
1580 unsigned int getConnectionEnqueuedForAcceptCallback() const {
1581 folly::RWSpinLock::ReadHolder holder(spinLock_);
1582 return connectionEnqueuedForAcceptCallback_;
1585 unsigned int getConnectionDequeuedByAcceptCallback() const {
1586 folly::RWSpinLock::ReadHolder holder(spinLock_);
1587 return connectionDequeuedByAcceptCallback_;
1590 unsigned int getBackoffStarted() const {
1591 folly::RWSpinLock::ReadHolder holder(spinLock_);
1592 return backoffStarted_;
1595 unsigned int getBackoffEnded() const {
1596 folly::RWSpinLock::ReadHolder holder(spinLock_);
1597 return backoffEnded_;
1600 unsigned int getBackoffError() const {
1601 folly::RWSpinLock::ReadHolder holder(spinLock_);
1602 return backoffError_;
1606 mutable folly::RWSpinLock spinLock_;
1607 unsigned int connectionAccepted_{0};
1608 unsigned int connectionAcceptedError_{0};
1609 unsigned int connectionDropped_{0};
1610 unsigned int connectionEnqueuedForAcceptCallback_{0};
1611 unsigned int connectionDequeuedByAcceptCallback_{0};
1612 unsigned int backoffStarted_{0};
1613 unsigned int backoffEnded_{0};
1614 unsigned int backoffError_{0};
1618 * Helper AcceptCallback class for the test code
1619 * It records the callbacks that were invoked, and also supports calling
1620 * generic std::function objects in each callback.
1622 class TestAcceptCallback : public AsyncServerSocket::AcceptCallback {
1631 EventInfo(int fd, const folly::SocketAddress& addr)
1632 : type(TYPE_ACCEPT),
1636 explicit EventInfo(const std::string& msg)
1641 explicit EventInfo(EventType et)
1648 int fd; // valid for TYPE_ACCEPT
1649 folly::SocketAddress address; // valid for TYPE_ACCEPT
1650 string errorMsg; // valid for TYPE_ERROR
1652 typedef std::deque<EventInfo> EventList;
1654 TestAcceptCallback()
1655 : connectionAcceptedFn_(),
1660 std::deque<EventInfo>* getEvents() {
1664 void setConnectionAcceptedFn(
1665 const std::function<void(int, const folly::SocketAddress&)>& fn) {
1666 connectionAcceptedFn_ = fn;
1668 void setAcceptErrorFn(const std::function<void(const std::exception&)>& fn) {
1669 acceptErrorFn_ = fn;
1671 void setAcceptStartedFn(const std::function<void()>& fn) {
1672 acceptStartedFn_ = fn;
1674 void setAcceptStoppedFn(const std::function<void()>& fn) {
1675 acceptStoppedFn_ = fn;
1678 void connectionAccepted(
1679 int fd, const folly::SocketAddress& clientAddr) noexcept override {
1680 events_.emplace_back(fd, clientAddr);
1682 if (connectionAcceptedFn_) {
1683 connectionAcceptedFn_(fd, clientAddr);
1686 void acceptError(const std::exception& ex) noexcept override {
1687 events_.emplace_back(ex.what());
1689 if (acceptErrorFn_) {
1693 void acceptStarted() noexcept override {
1694 events_.emplace_back(TYPE_START);
1696 if (acceptStartedFn_) {
1700 void acceptStopped() noexcept override {
1701 events_.emplace_back(TYPE_STOP);
1703 if (acceptStoppedFn_) {
1709 std::function<void(int, const folly::SocketAddress&)> connectionAcceptedFn_;
1710 std::function<void(const std::exception&)> acceptErrorFn_;
1711 std::function<void()> acceptStartedFn_;
1712 std::function<void()> acceptStoppedFn_;
1714 std::deque<EventInfo> events_;
1719 * Make sure accepted sockets have O_NONBLOCK and TCP_NODELAY set
1721 TEST(AsyncSocketTest, ServerAcceptOptions) {
1722 EventBase eventBase;
1724 // Create a server socket
1725 std::shared_ptr<AsyncServerSocket> serverSocket(
1726 AsyncServerSocket::newSocket(&eventBase));
1727 serverSocket->bind(0);
1728 serverSocket->listen(16);
1729 folly::SocketAddress serverAddress;
1730 serverSocket->getAddress(&serverAddress);
1732 // Add a callback to accept one connection then stop the loop
1733 TestAcceptCallback acceptCallback;
1734 acceptCallback.setConnectionAcceptedFn(
1735 [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1736 serverSocket->removeAcceptCallback(&acceptCallback, &eventBase);
1738 acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
1739 serverSocket->removeAcceptCallback(&acceptCallback, &eventBase);
1741 serverSocket->addAcceptCallback(&acceptCallback, &eventBase);
1742 serverSocket->startAccepting();
1744 // Connect to the server socket
1745 std::shared_ptr<AsyncSocket> socket(
1746 AsyncSocket::newSocket(&eventBase, serverAddress));
1750 // Verify that the server accepted a connection
1751 ASSERT_EQ(acceptCallback.getEvents()->size(), 3);
1752 ASSERT_EQ(acceptCallback.getEvents()->at(0).type,
1753 TestAcceptCallback::TYPE_START);
1754 ASSERT_EQ(acceptCallback.getEvents()->at(1).type,
1755 TestAcceptCallback::TYPE_ACCEPT);
1756 ASSERT_EQ(acceptCallback.getEvents()->at(2).type,
1757 TestAcceptCallback::TYPE_STOP);
1758 int fd = acceptCallback.getEvents()->at(1).fd;
1760 // The accepted connection should already be in non-blocking mode
1761 int flags = fcntl(fd, F_GETFL, 0);
1762 ASSERT_EQ(flags & O_NONBLOCK, O_NONBLOCK);
1765 // The accepted connection should already have TCP_NODELAY set
1767 socklen_t valueLength = sizeof(value);
1768 int rc = getsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &value, &valueLength);
1770 ASSERT_EQ(value, 1);
1775 * Test AsyncServerSocket::removeAcceptCallback()
1777 TEST(AsyncSocketTest, RemoveAcceptCallback) {
1778 // Create a new AsyncServerSocket
1779 EventBase eventBase;
1780 std::shared_ptr<AsyncServerSocket> serverSocket(
1781 AsyncServerSocket::newSocket(&eventBase));
1782 serverSocket->bind(0);
1783 serverSocket->listen(16);
1784 folly::SocketAddress serverAddress;
1785 serverSocket->getAddress(&serverAddress);
1787 // Add several accept callbacks
1788 TestAcceptCallback cb1;
1789 TestAcceptCallback cb2;
1790 TestAcceptCallback cb3;
1791 TestAcceptCallback cb4;
1792 TestAcceptCallback cb5;
1793 TestAcceptCallback cb6;
1794 TestAcceptCallback cb7;
1796 // Test having callbacks remove other callbacks before them on the list,
1797 // after them on the list, or removing themselves.
1799 // Have callback 2 remove callback 3 and callback 5 the first time it is
1802 cb1.setConnectionAcceptedFn([&](int /* fd */,
1803 const folly::SocketAddress& /* addr */) {
1804 std::shared_ptr<AsyncSocket> sock2(
1805 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb2: -cb3 -cb5
1807 cb3.setConnectionAcceptedFn(
1808 [&](int /* fd */, const folly::SocketAddress& /* addr */) {});
1809 cb4.setConnectionAcceptedFn(
1810 [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1811 std::shared_ptr<AsyncSocket> sock3(
1812 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb4
1814 cb5.setConnectionAcceptedFn(
1815 [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1816 std::shared_ptr<AsyncSocket> sock5(
1817 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb7: -cb7
1820 cb2.setConnectionAcceptedFn(
1821 [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1822 if (cb2Count == 0) {
1823 serverSocket->removeAcceptCallback(&cb3, nullptr);
1824 serverSocket->removeAcceptCallback(&cb5, nullptr);
1828 // Have callback 6 remove callback 4 the first time it is called,
1829 // and destroy the server socket the second time it is called
1831 cb6.setConnectionAcceptedFn(
1832 [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1833 if (cb6Count == 0) {
1834 serverSocket->removeAcceptCallback(&cb4, nullptr);
1835 std::shared_ptr<AsyncSocket> sock6(
1836 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb1
1837 std::shared_ptr<AsyncSocket> sock7(
1838 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb2
1839 std::shared_ptr<AsyncSocket> sock8(
1840 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb6: stop
1843 serverSocket.reset();
1847 // Have callback 7 remove itself
1848 cb7.setConnectionAcceptedFn(
1849 [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1850 serverSocket->removeAcceptCallback(&cb7, nullptr);
1853 serverSocket->addAcceptCallback(&cb1, &eventBase);
1854 serverSocket->addAcceptCallback(&cb2, &eventBase);
1855 serverSocket->addAcceptCallback(&cb3, &eventBase);
1856 serverSocket->addAcceptCallback(&cb4, &eventBase);
1857 serverSocket->addAcceptCallback(&cb5, &eventBase);
1858 serverSocket->addAcceptCallback(&cb6, &eventBase);
1859 serverSocket->addAcceptCallback(&cb7, &eventBase);
1860 serverSocket->startAccepting();
1862 // Make several connections to the socket
1863 std::shared_ptr<AsyncSocket> sock1(
1864 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb1
1865 std::shared_ptr<AsyncSocket> sock4(
1866 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb6: -cb4
1868 // Loop until we are stopped
1871 // Check to make sure that the expected callbacks were invoked.
1873 // NOTE: This code depends on the AsyncServerSocket operating calling all of
1874 // the AcceptCallbacks in round-robin fashion, in the order that they were
1875 // added. The code is implemented this way right now, but the API doesn't
1876 // explicitly require it be done this way. If we change the code not to be
1877 // exactly round robin in the future, we can simplify the test checks here.
1878 // (We'll also need to update the termination code, since we expect cb6 to
1879 // get called twice to terminate the loop.)
1880 ASSERT_EQ(cb1.getEvents()->size(), 4);
1881 ASSERT_EQ(cb1.getEvents()->at(0).type,
1882 TestAcceptCallback::TYPE_START);
1883 ASSERT_EQ(cb1.getEvents()->at(1).type,
1884 TestAcceptCallback::TYPE_ACCEPT);
1885 ASSERT_EQ(cb1.getEvents()->at(2).type,
1886 TestAcceptCallback::TYPE_ACCEPT);
1887 ASSERT_EQ(cb1.getEvents()->at(3).type,
1888 TestAcceptCallback::TYPE_STOP);
1890 ASSERT_EQ(cb2.getEvents()->size(), 4);
1891 ASSERT_EQ(cb2.getEvents()->at(0).type,
1892 TestAcceptCallback::TYPE_START);
1893 ASSERT_EQ(cb2.getEvents()->at(1).type,
1894 TestAcceptCallback::TYPE_ACCEPT);
1895 ASSERT_EQ(cb2.getEvents()->at(2).type,
1896 TestAcceptCallback::TYPE_ACCEPT);
1897 ASSERT_EQ(cb2.getEvents()->at(3).type,
1898 TestAcceptCallback::TYPE_STOP);
1900 ASSERT_EQ(cb3.getEvents()->size(), 2);
1901 ASSERT_EQ(cb3.getEvents()->at(0).type,
1902 TestAcceptCallback::TYPE_START);
1903 ASSERT_EQ(cb3.getEvents()->at(1).type,
1904 TestAcceptCallback::TYPE_STOP);
1906 ASSERT_EQ(cb4.getEvents()->size(), 3);
1907 ASSERT_EQ(cb4.getEvents()->at(0).type,
1908 TestAcceptCallback::TYPE_START);
1909 ASSERT_EQ(cb4.getEvents()->at(1).type,
1910 TestAcceptCallback::TYPE_ACCEPT);
1911 ASSERT_EQ(cb4.getEvents()->at(2).type,
1912 TestAcceptCallback::TYPE_STOP);
1914 ASSERT_EQ(cb5.getEvents()->size(), 2);
1915 ASSERT_EQ(cb5.getEvents()->at(0).type,
1916 TestAcceptCallback::TYPE_START);
1917 ASSERT_EQ(cb5.getEvents()->at(1).type,
1918 TestAcceptCallback::TYPE_STOP);
1920 ASSERT_EQ(cb6.getEvents()->size(), 4);
1921 ASSERT_EQ(cb6.getEvents()->at(0).type,
1922 TestAcceptCallback::TYPE_START);
1923 ASSERT_EQ(cb6.getEvents()->at(1).type,
1924 TestAcceptCallback::TYPE_ACCEPT);
1925 ASSERT_EQ(cb6.getEvents()->at(2).type,
1926 TestAcceptCallback::TYPE_ACCEPT);
1927 ASSERT_EQ(cb6.getEvents()->at(3).type,
1928 TestAcceptCallback::TYPE_STOP);
1930 ASSERT_EQ(cb7.getEvents()->size(), 3);
1931 ASSERT_EQ(cb7.getEvents()->at(0).type,
1932 TestAcceptCallback::TYPE_START);
1933 ASSERT_EQ(cb7.getEvents()->at(1).type,
1934 TestAcceptCallback::TYPE_ACCEPT);
1935 ASSERT_EQ(cb7.getEvents()->at(2).type,
1936 TestAcceptCallback::TYPE_STOP);
1940 * Test AsyncServerSocket::removeAcceptCallback()
1942 TEST(AsyncSocketTest, OtherThreadAcceptCallback) {
1943 // Create a new AsyncServerSocket
1944 EventBase eventBase;
1945 std::shared_ptr<AsyncServerSocket> serverSocket(
1946 AsyncServerSocket::newSocket(&eventBase));
1947 serverSocket->bind(0);
1948 serverSocket->listen(16);
1949 folly::SocketAddress serverAddress;
1950 serverSocket->getAddress(&serverAddress);
1952 // Add several accept callbacks
1953 TestAcceptCallback cb1;
1954 auto thread_id = std::this_thread::get_id();
1955 cb1.setAcceptStartedFn([&](){
1956 CHECK_NE(thread_id, std::this_thread::get_id());
1957 thread_id = std::this_thread::get_id();
1959 cb1.setConnectionAcceptedFn(
1960 [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1961 ASSERT_EQ(thread_id, std::this_thread::get_id());
1962 serverSocket->removeAcceptCallback(&cb1, &eventBase);
1964 cb1.setAcceptStoppedFn([&](){
1965 ASSERT_EQ(thread_id, std::this_thread::get_id());
1968 // Test having callbacks remove other callbacks before them on the list,
1969 serverSocket->addAcceptCallback(&cb1, &eventBase);
1970 serverSocket->startAccepting();
1972 // Make several connections to the socket
1973 std::shared_ptr<AsyncSocket> sock1(
1974 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb1
1976 // Loop in another thread
1977 auto other = std::thread([&](){
1982 // Check to make sure that the expected callbacks were invoked.
1984 // NOTE: This code depends on the AsyncServerSocket operating calling all of
1985 // the AcceptCallbacks in round-robin fashion, in the order that they were
1986 // added. The code is implemented this way right now, but the API doesn't
1987 // explicitly require it be done this way. If we change the code not to be
1988 // exactly round robin in the future, we can simplify the test checks here.
1989 // (We'll also need to update the termination code, since we expect cb6 to
1990 // get called twice to terminate the loop.)
1991 ASSERT_EQ(cb1.getEvents()->size(), 3);
1992 ASSERT_EQ(cb1.getEvents()->at(0).type,
1993 TestAcceptCallback::TYPE_START);
1994 ASSERT_EQ(cb1.getEvents()->at(1).type,
1995 TestAcceptCallback::TYPE_ACCEPT);
1996 ASSERT_EQ(cb1.getEvents()->at(2).type,
1997 TestAcceptCallback::TYPE_STOP);
2001 void serverSocketSanityTest(AsyncServerSocket* serverSocket) {
2002 EventBase* eventBase = serverSocket->getEventBase();
2005 // Add a callback to accept one connection then stop accepting
2006 TestAcceptCallback acceptCallback;
2007 acceptCallback.setConnectionAcceptedFn(
2008 [&](int /* fd */, const folly::SocketAddress& /* addr */) {
2009 serverSocket->removeAcceptCallback(&acceptCallback, eventBase);
2011 acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
2012 serverSocket->removeAcceptCallback(&acceptCallback, eventBase);
2014 serverSocket->addAcceptCallback(&acceptCallback, eventBase);
2015 serverSocket->startAccepting();
2017 // Connect to the server socket
2018 folly::SocketAddress serverAddress;
2019 serverSocket->getAddress(&serverAddress);
2020 AsyncSocket::UniquePtr socket(new AsyncSocket(eventBase, serverAddress));
2022 // Loop to process all events
2025 // Verify that the server accepted a connection
2026 ASSERT_EQ(acceptCallback.getEvents()->size(), 3);
2027 ASSERT_EQ(acceptCallback.getEvents()->at(0).type,
2028 TestAcceptCallback::TYPE_START);
2029 ASSERT_EQ(acceptCallback.getEvents()->at(1).type,
2030 TestAcceptCallback::TYPE_ACCEPT);
2031 ASSERT_EQ(acceptCallback.getEvents()->at(2).type,
2032 TestAcceptCallback::TYPE_STOP);
2035 /* Verify that we don't leak sockets if we are destroyed()
2036 * and there are still writes pending
2038 * If destroy() only calls close() instead of closeNow(),
2039 * it would shutdown(writes) on the socket, but it would
2040 * never be close()'d, and the socket would leak
2042 TEST(AsyncSocketTest, DestroyCloseTest) {
2048 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&clientEB);
2050 socket->connect(&ccb, server.getAddress(), 30);
2052 // Accept the connection
2053 std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&serverEB);
2055 acceptedSocket->setReadCB(&rcb);
2057 // Write a large buffer to the socket that is larger than kernel buffer
2058 size_t simpleBufLength = 5000000;
2059 char* simpleBuf = new char[simpleBufLength];
2060 memset(simpleBuf, 'a', simpleBufLength);
2063 // Let the reads and writes run to completion
2064 int fd = acceptedSocket->getFd();
2066 acceptedSocket->write(&wcb, simpleBuf, simpleBufLength);
2068 acceptedSocket.reset();
2070 // Test that server socket was closed
2071 ssize_t sz = read(fd, simpleBuf, simpleBufLength);
2073 ASSERT_EQ(errno, 9);
2078 * Test AsyncServerSocket::useExistingSocket()
2080 TEST(AsyncSocketTest, ServerExistingSocket) {
2081 EventBase eventBase;
2083 // Test creating a socket, and letting AsyncServerSocket bind and listen
2085 // Manually create a socket
2086 int fd = fsp::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
2089 // Create a server socket
2090 AsyncServerSocket::UniquePtr serverSocket(
2091 new AsyncServerSocket(&eventBase));
2092 serverSocket->useExistingSocket(fd);
2093 folly::SocketAddress address;
2094 serverSocket->getAddress(&address);
2096 serverSocket->bind(address);
2097 serverSocket->listen(16);
2099 // Make sure the socket works
2100 serverSocketSanityTest(serverSocket.get());
2103 // Test creating a socket and binding manually,
2104 // then letting AsyncServerSocket listen
2106 // Manually create a socket
2107 int fd = fsp::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
2110 struct sockaddr_in addr;
2111 addr.sin_family = AF_INET;
2113 addr.sin_addr.s_addr = INADDR_ANY;
2114 ASSERT_EQ(bind(fd, reinterpret_cast<struct sockaddr*>(&addr),
2116 // Look up the address that we bound to
2117 folly::SocketAddress boundAddress;
2118 boundAddress.setFromLocalAddress(fd);
2120 // Create a server socket
2121 AsyncServerSocket::UniquePtr serverSocket(
2122 new AsyncServerSocket(&eventBase));
2123 serverSocket->useExistingSocket(fd);
2124 serverSocket->listen(16);
2126 // Make sure AsyncServerSocket reports the same address that we bound to
2127 folly::SocketAddress serverSocketAddress;
2128 serverSocket->getAddress(&serverSocketAddress);
2129 ASSERT_EQ(boundAddress, serverSocketAddress);
2131 // Make sure the socket works
2132 serverSocketSanityTest(serverSocket.get());
2135 // Test creating a socket, binding and listening manually,
2136 // then giving it to AsyncServerSocket
2138 // Manually create a socket
2139 int fd = fsp::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
2142 struct sockaddr_in addr;
2143 addr.sin_family = AF_INET;
2145 addr.sin_addr.s_addr = INADDR_ANY;
2146 ASSERT_EQ(bind(fd, reinterpret_cast<struct sockaddr*>(&addr),
2148 // Look up the address that we bound to
2149 folly::SocketAddress boundAddress;
2150 boundAddress.setFromLocalAddress(fd);
2152 ASSERT_EQ(listen(fd, 16), 0);
2154 // Create a server socket
2155 AsyncServerSocket::UniquePtr serverSocket(
2156 new AsyncServerSocket(&eventBase));
2157 serverSocket->useExistingSocket(fd);
2159 // Make sure AsyncServerSocket reports the same address that we bound to
2160 folly::SocketAddress serverSocketAddress;
2161 serverSocket->getAddress(&serverSocketAddress);
2162 ASSERT_EQ(boundAddress, serverSocketAddress);
2164 // Make sure the socket works
2165 serverSocketSanityTest(serverSocket.get());
2169 TEST(AsyncSocketTest, UnixDomainSocketTest) {
2170 EventBase eventBase;
2172 // Create a server socket
2173 std::shared_ptr<AsyncServerSocket> serverSocket(
2174 AsyncServerSocket::newSocket(&eventBase));
2176 path.append(folly::to<string>("/anonymous", folly::Random::rand64()));
2177 folly::SocketAddress serverAddress;
2178 serverAddress.setFromPath(path);
2179 serverSocket->bind(serverAddress);
2180 serverSocket->listen(16);
2182 // Add a callback to accept one connection then stop the loop
2183 TestAcceptCallback acceptCallback;
2184 acceptCallback.setConnectionAcceptedFn(
2185 [&](int /* fd */, const folly::SocketAddress& /* addr */) {
2186 serverSocket->removeAcceptCallback(&acceptCallback, &eventBase);
2188 acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
2189 serverSocket->removeAcceptCallback(&acceptCallback, &eventBase);
2191 serverSocket->addAcceptCallback(&acceptCallback, &eventBase);
2192 serverSocket->startAccepting();
2194 // Connect to the server socket
2195 std::shared_ptr<AsyncSocket> socket(
2196 AsyncSocket::newSocket(&eventBase, serverAddress));
2200 // Verify that the server accepted a connection
2201 ASSERT_EQ(acceptCallback.getEvents()->size(), 3);
2202 ASSERT_EQ(acceptCallback.getEvents()->at(0).type,
2203 TestAcceptCallback::TYPE_START);
2204 ASSERT_EQ(acceptCallback.getEvents()->at(1).type,
2205 TestAcceptCallback::TYPE_ACCEPT);
2206 ASSERT_EQ(acceptCallback.getEvents()->at(2).type,
2207 TestAcceptCallback::TYPE_STOP);
2208 int fd = acceptCallback.getEvents()->at(1).fd;
2210 // The accepted connection should already be in non-blocking mode
2211 int flags = fcntl(fd, F_GETFL, 0);
2212 ASSERT_EQ(flags & O_NONBLOCK, O_NONBLOCK);
2215 TEST(AsyncSocketTest, ConnectionEventCallbackDefault) {
2216 EventBase eventBase;
2217 TestConnectionEventCallback connectionEventCallback;
2219 // Create a server socket
2220 std::shared_ptr<AsyncServerSocket> serverSocket(
2221 AsyncServerSocket::newSocket(&eventBase));
2222 serverSocket->setConnectionEventCallback(&connectionEventCallback);
2223 serverSocket->bind(0);
2224 serverSocket->listen(16);
2225 folly::SocketAddress serverAddress;
2226 serverSocket->getAddress(&serverAddress);
2228 // Add a callback to accept one connection then stop the loop
2229 TestAcceptCallback acceptCallback;
2230 acceptCallback.setConnectionAcceptedFn(
2231 [&](int /* fd */, const folly::SocketAddress& /* addr */) {
2232 serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
2234 acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
2235 serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
2237 serverSocket->addAcceptCallback(&acceptCallback, &eventBase);
2238 serverSocket->startAccepting();
2240 // Connect to the server socket
2241 std::shared_ptr<AsyncSocket> socket(
2242 AsyncSocket::newSocket(&eventBase, serverAddress));
2246 // Validate the connection event counters
2247 ASSERT_EQ(connectionEventCallback.getConnectionAccepted(), 1);
2248 ASSERT_EQ(connectionEventCallback.getConnectionAcceptedError(), 0);
2249 ASSERT_EQ(connectionEventCallback.getConnectionDropped(), 0);
2251 connectionEventCallback.getConnectionEnqueuedForAcceptCallback(), 1);
2252 ASSERT_EQ(connectionEventCallback.getConnectionDequeuedByAcceptCallback(), 1);
2253 ASSERT_EQ(connectionEventCallback.getBackoffStarted(), 0);
2254 ASSERT_EQ(connectionEventCallback.getBackoffEnded(), 0);
2255 ASSERT_EQ(connectionEventCallback.getBackoffError(), 0);
2258 TEST(AsyncSocketTest, CallbackInPrimaryEventBase) {
2259 EventBase eventBase;
2260 TestConnectionEventCallback connectionEventCallback;
2262 // Create a server socket
2263 std::shared_ptr<AsyncServerSocket> serverSocket(
2264 AsyncServerSocket::newSocket(&eventBase));
2265 serverSocket->setConnectionEventCallback(&connectionEventCallback);
2266 serverSocket->bind(0);
2267 serverSocket->listen(16);
2268 folly::SocketAddress serverAddress;
2269 serverSocket->getAddress(&serverAddress);
2271 // Add a callback to accept one connection then stop the loop
2272 TestAcceptCallback acceptCallback;
2273 acceptCallback.setConnectionAcceptedFn(
2274 [&](int /* fd */, const folly::SocketAddress& /* addr */) {
2275 serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
2277 acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
2278 serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
2280 bool acceptStartedFlag{false};
2281 acceptCallback.setAcceptStartedFn([&acceptStartedFlag](){
2282 acceptStartedFlag = true;
2284 bool acceptStoppedFlag{false};
2285 acceptCallback.setAcceptStoppedFn([&acceptStoppedFlag](){
2286 acceptStoppedFlag = true;
2288 serverSocket->addAcceptCallback(&acceptCallback, nullptr);
2289 serverSocket->startAccepting();
2291 // Connect to the server socket
2292 std::shared_ptr<AsyncSocket> socket(
2293 AsyncSocket::newSocket(&eventBase, serverAddress));
2297 ASSERT_TRUE(acceptStartedFlag);
2298 ASSERT_TRUE(acceptStoppedFlag);
2299 // Validate the connection event counters
2300 ASSERT_EQ(connectionEventCallback.getConnectionAccepted(), 1);
2301 ASSERT_EQ(connectionEventCallback.getConnectionAcceptedError(), 0);
2302 ASSERT_EQ(connectionEventCallback.getConnectionDropped(), 0);
2304 connectionEventCallback.getConnectionEnqueuedForAcceptCallback(), 0);
2305 ASSERT_EQ(connectionEventCallback.getConnectionDequeuedByAcceptCallback(), 0);
2306 ASSERT_EQ(connectionEventCallback.getBackoffStarted(), 0);
2307 ASSERT_EQ(connectionEventCallback.getBackoffEnded(), 0);
2308 ASSERT_EQ(connectionEventCallback.getBackoffError(), 0);
2314 * Test AsyncServerSocket::getNumPendingMessagesInQueue()
2316 TEST(AsyncSocketTest, NumPendingMessagesInQueue) {
2317 EventBase eventBase;
2319 // Counter of how many connections have been accepted
2322 // Create a server socket
2323 auto serverSocket(AsyncServerSocket::newSocket(&eventBase));
2324 serverSocket->bind(0);
2325 serverSocket->listen(16);
2326 folly::SocketAddress serverAddress;
2327 serverSocket->getAddress(&serverAddress);
2329 // Add a callback to accept connections
2330 TestAcceptCallback acceptCallback;
2331 acceptCallback.setConnectionAcceptedFn(
2332 [&](int /* fd */, const folly::SocketAddress& /* addr */) {
2334 ASSERT_EQ(4 - count, serverSocket->getNumPendingMessagesInQueue());
2337 // all messages are processed, remove accept callback
2338 serverSocket->removeAcceptCallback(&acceptCallback, &eventBase);
2341 acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
2342 serverSocket->removeAcceptCallback(&acceptCallback, &eventBase);
2344 serverSocket->addAcceptCallback(&acceptCallback, &eventBase);
2345 serverSocket->startAccepting();
2347 // Connect to the server socket, 4 clients, there are 4 connections
2348 auto socket1(AsyncSocket::newSocket(&eventBase, serverAddress));
2349 auto socket2(AsyncSocket::newSocket(&eventBase, serverAddress));
2350 auto socket3(AsyncSocket::newSocket(&eventBase, serverAddress));
2351 auto socket4(AsyncSocket::newSocket(&eventBase, serverAddress));
2357 * Test AsyncTransport::BufferCallback
2359 TEST(AsyncSocketTest, BufferTest) {
2363 AsyncSocket::OptionMap option{{{SOL_SOCKET, SO_SNDBUF}, 128}};
2364 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
2366 socket->connect(&ccb, server.getAddress(), 30, option);
2368 char buf[100 * 1024];
2369 memset(buf, 'c', sizeof(buf));
2372 socket->setBufferCallback(&bcb);
2373 socket->write(&wcb, buf, sizeof(buf), WriteFlags::NONE);
2376 ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
2377 ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
2379 ASSERT_TRUE(bcb.hasBuffered());
2380 ASSERT_TRUE(bcb.hasBufferCleared());
2383 server.verifyConnection(buf, sizeof(buf));
2385 ASSERT_TRUE(socket->isClosedBySelf());
2386 ASSERT_FALSE(socket->isClosedByPeer());
2389 TEST(AsyncSocketTest, BufferCallbackKill) {
2392 AsyncSocket::OptionMap option{{{SOL_SOCKET, SO_SNDBUF}, 128}};
2393 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
2395 socket->connect(&ccb, server.getAddress(), 30, option);
2398 char buf[100 * 1024];
2399 memset(buf, 'c', sizeof(buf));
2401 socket->setBufferCallback(&bcb);
2403 wcb.successCallback = [&] {
2404 ASSERT_TRUE(socket.unique());
2408 // This will trigger AsyncSocket::handleWrite,
2409 // which calls WriteCallback::writeSuccess,
2410 // which calls wcb.successCallback above,
2411 // which tries to delete socket
2412 // Then, the socket will also try to use this BufferCallback
2413 // And that should crash us, if there is no DestructorGuard on the stack
2414 socket->write(&wcb, buf, sizeof(buf), WriteFlags::NONE);
2417 ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
2421 TEST(AsyncSocketTest, ConnectTFO) {
2422 // Start listening on a local port
2423 TestServer server(true);
2425 // Connect using a AsyncSocket
2427 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
2428 socket->enableTFO();
2430 socket->connect(&cb, server.getAddress(), 30);
2432 std::array<uint8_t, 128> buf;
2433 memset(buf.data(), 'a', buf.size());
2435 std::array<uint8_t, 3> readBuf;
2436 auto sendBuf = IOBuf::copyBuffer("hey");
2439 auto acceptedSocket = server.accept();
2440 acceptedSocket->write(buf.data(), buf.size());
2441 acceptedSocket->flush();
2442 acceptedSocket->readAll(readBuf.data(), readBuf.size());
2443 acceptedSocket->close();
2448 ASSERT_EQ(cb.state, STATE_SUCCEEDED);
2449 EXPECT_LE(0, socket->getConnectTime().count());
2450 EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30));
2451 EXPECT_TRUE(socket->getTFOAttempted());
2453 // Should trigger the connect
2454 WriteCallback write;
2456 socket->writeChain(&write, sendBuf->clone());
2457 socket->setReadCB(&rcb);
2462 EXPECT_EQ(STATE_SUCCEEDED, write.state);
2463 EXPECT_EQ(0, memcmp(readBuf.data(), sendBuf->data(), readBuf.size()));
2464 EXPECT_EQ(STATE_SUCCEEDED, rcb.state);
2465 ASSERT_EQ(1, rcb.buffers.size());
2466 ASSERT_EQ(sizeof(buf), rcb.buffers[0].length);
2467 EXPECT_EQ(0, memcmp(rcb.buffers[0].buffer, buf.data(), buf.size()));
2468 EXPECT_EQ(socket->getTFOFinished(), socket->getTFOSucceded());
2471 TEST(AsyncSocketTest, ConnectTFOSupplyEarlyReadCB) {
2472 // Start listening on a local port
2473 TestServer server(true);
2475 // Connect using a AsyncSocket
2477 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
2478 socket->enableTFO();
2480 socket->connect(&cb, server.getAddress(), 30);
2482 socket->setReadCB(&rcb);
2484 std::array<uint8_t, 128> buf;
2485 memset(buf.data(), 'a', buf.size());
2487 std::array<uint8_t, 3> readBuf;
2488 auto sendBuf = IOBuf::copyBuffer("hey");
2491 auto acceptedSocket = server.accept();
2492 acceptedSocket->write(buf.data(), buf.size());
2493 acceptedSocket->flush();
2494 acceptedSocket->readAll(readBuf.data(), readBuf.size());
2495 acceptedSocket->close();
2500 ASSERT_EQ(cb.state, STATE_SUCCEEDED);
2501 EXPECT_LE(0, socket->getConnectTime().count());
2502 EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30));
2503 EXPECT_TRUE(socket->getTFOAttempted());
2505 // Should trigger the connect
2506 WriteCallback write;
2507 socket->writeChain(&write, sendBuf->clone());
2512 EXPECT_EQ(STATE_SUCCEEDED, write.state);
2513 EXPECT_EQ(0, memcmp(readBuf.data(), sendBuf->data(), readBuf.size()));
2514 EXPECT_EQ(STATE_SUCCEEDED, rcb.state);
2515 ASSERT_EQ(1, rcb.buffers.size());
2516 ASSERT_EQ(sizeof(buf), rcb.buffers[0].length);
2517 EXPECT_EQ(0, memcmp(rcb.buffers[0].buffer, buf.data(), buf.size()));
2518 EXPECT_EQ(socket->getTFOFinished(), socket->getTFOSucceded());
2522 * Test connecting to a server that isn't listening
2524 TEST(AsyncSocketTest, ConnectRefusedTFO) {
2527 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
2529 socket->enableTFO();
2531 // Hopefully nothing is actually listening on this address
2532 folly::SocketAddress addr("::1", 65535);
2534 socket->connect(&cb, addr, 30);
2538 WriteCallback write1;
2539 // Trigger the connect if TFO attempt is supported.
2540 socket->writeChain(&write1, IOBuf::copyBuffer("hey"));
2542 WriteCallback write2;
2543 socket->writeChain(&write2, IOBuf::copyBuffer("hey"));
2546 if (!socket->getTFOFinished()) {
2547 EXPECT_EQ(STATE_FAILED, write1.state);
2549 EXPECT_EQ(STATE_SUCCEEDED, write1.state);
2550 EXPECT_FALSE(socket->getTFOSucceded());
2553 EXPECT_EQ(STATE_FAILED, write2.state);
2555 EXPECT_EQ(STATE_SUCCEEDED, cb.state);
2556 EXPECT_LE(0, socket->getConnectTime().count());
2557 EXPECT_EQ(std::chrono::milliseconds(30), socket->getConnectTimeout());
2558 EXPECT_TRUE(socket->getTFOAttempted());
2562 * Test calling closeNow() immediately after connecting.
2564 TEST(AsyncSocketTest, ConnectWriteAndCloseNowTFO) {
2565 TestServer server(true);
2569 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
2570 socket->enableTFO();
2573 socket->connect(&ccb, server.getAddress(), 30);
2576 std::array<char, 128> buf;
2577 memset(buf.data(), 'a', buf.size());
2582 // Loop, although there shouldn't be anything to do.
2585 ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
2587 ASSERT_TRUE(socket->isClosedBySelf());
2588 ASSERT_FALSE(socket->isClosedByPeer());
2592 * Test calling close() immediately after connect()
2594 TEST(AsyncSocketTest, ConnectAndCloseTFO) {
2595 TestServer server(true);
2597 // Connect using a AsyncSocket
2599 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
2600 socket->enableTFO();
2603 socket->connect(&ccb, server.getAddress(), 30);
2607 // Loop, although there shouldn't be anything to do.
2610 // Make sure the connection was aborted
2611 ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
2613 ASSERT_TRUE(socket->isClosedBySelf());
2614 ASSERT_FALSE(socket->isClosedByPeer());
2617 class MockAsyncTFOSocket : public AsyncSocket {
2619 using UniquePtr = std::unique_ptr<MockAsyncTFOSocket, Destructor>;
2621 explicit MockAsyncTFOSocket(EventBase* evb) : AsyncSocket(evb) {}
2623 MOCK_METHOD3(tfoSendMsg, ssize_t(int fd, struct msghdr* msg, int msg_flags));
2626 TEST(AsyncSocketTest, TestTFOUnsupported) {
2627 TestServer server(true);
2629 // Connect using a AsyncSocket
2631 auto socket = MockAsyncTFOSocket::UniquePtr(new MockAsyncTFOSocket(&evb));
2632 socket->enableTFO();
2635 socket->connect(&ccb, server.getAddress(), 30);
2636 ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
2639 socket->setReadCB(&rcb);
2641 EXPECT_CALL(*socket, tfoSendMsg(_, _, _))
2642 .WillOnce(SetErrnoAndReturn(EOPNOTSUPP, -1));
2643 WriteCallback write;
2644 auto sendBuf = IOBuf::copyBuffer("hey");
2645 socket->writeChain(&write, sendBuf->clone());
2646 EXPECT_EQ(STATE_WAITING, write.state);
2648 std::array<uint8_t, 128> buf;
2649 memset(buf.data(), 'a', buf.size());
2651 std::array<uint8_t, 3> readBuf;
2654 std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
2655 acceptedSocket->write(buf.data(), buf.size());
2656 acceptedSocket->flush();
2657 acceptedSocket->readAll(readBuf.data(), readBuf.size());
2658 acceptedSocket->close();
2664 EXPECT_EQ(STATE_SUCCEEDED, ccb.state);
2665 EXPECT_EQ(STATE_SUCCEEDED, write.state);
2667 EXPECT_EQ(0, memcmp(readBuf.data(), sendBuf->data(), readBuf.size()));
2668 EXPECT_EQ(STATE_SUCCEEDED, rcb.state);
2669 ASSERT_EQ(1, rcb.buffers.size());
2670 ASSERT_EQ(sizeof(buf), rcb.buffers[0].length);
2671 EXPECT_EQ(0, memcmp(rcb.buffers[0].buffer, buf.data(), buf.size()));
2672 EXPECT_EQ(socket->getTFOFinished(), socket->getTFOSucceded());
2675 TEST(AsyncSocketTest, TestTFOUnsupportedTimeout) {
2676 // Try connecting to server that won't respond.
2678 // This depends somewhat on the network where this test is run.
2679 // Hopefully this IP will be routable but unresponsive.
2680 // (Alternatively, we could try listening on a local raw socket, but that
2681 // normally requires root privileges.)
2682 auto host = SocketAddressTestHelper::isIPv6Enabled()
2683 ? SocketAddressTestHelper::kGooglePublicDnsAAddrIPv6
2684 : SocketAddressTestHelper::isIPv4Enabled()
2685 ? SocketAddressTestHelper::kGooglePublicDnsAAddrIPv4
2687 SocketAddress addr(host, 65535);
2689 // Connect using a AsyncSocket
2691 auto socket = MockAsyncTFOSocket::UniquePtr(new MockAsyncTFOSocket(&evb));
2692 socket->enableTFO();
2695 // Set a very small timeout
2696 socket->connect(&ccb, addr, 1);
2697 EXPECT_EQ(STATE_SUCCEEDED, ccb.state);
2700 socket->setReadCB(&rcb);
2702 EXPECT_CALL(*socket, tfoSendMsg(_, _, _))
2703 .WillOnce(SetErrnoAndReturn(EOPNOTSUPP, -1));
2704 WriteCallback write;
2705 socket->writeChain(&write, IOBuf::copyBuffer("hey"));
2709 EXPECT_EQ(STATE_FAILED, write.state);
2712 TEST(AsyncSocketTest, TestTFOFallbackToConnect) {
2713 TestServer server(true);
2715 // Connect using a AsyncSocket
2717 auto socket = MockAsyncTFOSocket::UniquePtr(new MockAsyncTFOSocket(&evb));
2718 socket->enableTFO();
2721 socket->connect(&ccb, server.getAddress(), 30);
2722 ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
2725 socket->setReadCB(&rcb);
2727 EXPECT_CALL(*socket, tfoSendMsg(_, _, _))
2728 .WillOnce(Invoke([&](int fd, struct msghdr*, int) {
2729 sockaddr_storage addr;
2730 auto len = server.getAddress().getAddress(&addr);
2731 return connect(fd, (const struct sockaddr*)&addr, len);
2733 WriteCallback write;
2734 auto sendBuf = IOBuf::copyBuffer("hey");
2735 socket->writeChain(&write, sendBuf->clone());
2736 EXPECT_EQ(STATE_WAITING, write.state);
2738 std::array<uint8_t, 128> buf;
2739 memset(buf.data(), 'a', buf.size());
2741 std::array<uint8_t, 3> readBuf;
2744 std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
2745 acceptedSocket->write(buf.data(), buf.size());
2746 acceptedSocket->flush();
2747 acceptedSocket->readAll(readBuf.data(), readBuf.size());
2748 acceptedSocket->close();
2754 EXPECT_EQ(0, memcmp(readBuf.data(), sendBuf->data(), readBuf.size()));
2756 EXPECT_EQ(STATE_SUCCEEDED, ccb.state);
2757 EXPECT_EQ(STATE_SUCCEEDED, write.state);
2759 EXPECT_EQ(STATE_SUCCEEDED, rcb.state);
2760 ASSERT_EQ(1, rcb.buffers.size());
2761 ASSERT_EQ(buf.size(), rcb.buffers[0].length);
2762 EXPECT_EQ(0, memcmp(rcb.buffers[0].buffer, buf.data(), buf.size()));
2765 TEST(AsyncSocketTest, TestTFOFallbackTimeout) {
2766 // Try connecting to server that won't respond.
2768 // This depends somewhat on the network where this test is run.
2769 // Hopefully this IP will be routable but unresponsive.
2770 // (Alternatively, we could try listening on a local raw socket, but that
2771 // normally requires root privileges.)
2772 auto host = SocketAddressTestHelper::isIPv6Enabled()
2773 ? SocketAddressTestHelper::kGooglePublicDnsAAddrIPv6
2774 : SocketAddressTestHelper::isIPv4Enabled()
2775 ? SocketAddressTestHelper::kGooglePublicDnsAAddrIPv4
2777 SocketAddress addr(host, 65535);
2779 // Connect using a AsyncSocket
2781 auto socket = MockAsyncTFOSocket::UniquePtr(new MockAsyncTFOSocket(&evb));
2782 socket->enableTFO();
2785 // Set a very small timeout
2786 socket->connect(&ccb, addr, 1);
2787 EXPECT_EQ(STATE_SUCCEEDED, ccb.state);
2790 socket->setReadCB(&rcb);
2792 EXPECT_CALL(*socket, tfoSendMsg(_, _, _))
2793 .WillOnce(Invoke([&](int fd, struct msghdr*, int) {
2794 sockaddr_storage addr2;
2795 auto len = addr.getAddress(&addr2);
2796 return connect(fd, (const struct sockaddr*)&addr2, len);
2798 WriteCallback write;
2799 socket->writeChain(&write, IOBuf::copyBuffer("hey"));
2803 EXPECT_EQ(STATE_FAILED, write.state);
2806 TEST(AsyncSocketTest, TestTFOEagain) {
2807 TestServer server(true);
2809 // Connect using a AsyncSocket
2811 auto socket = MockAsyncTFOSocket::UniquePtr(new MockAsyncTFOSocket(&evb));
2812 socket->enableTFO();
2815 socket->connect(&ccb, server.getAddress(), 30);
2817 EXPECT_CALL(*socket, tfoSendMsg(_, _, _))
2818 .WillOnce(SetErrnoAndReturn(EAGAIN, -1));
2819 WriteCallback write;
2820 socket->writeChain(&write, IOBuf::copyBuffer("hey"));
2824 EXPECT_EQ(STATE_SUCCEEDED, ccb.state);
2825 EXPECT_EQ(STATE_FAILED, write.state);
2828 // Sending a large amount of data in the first write which will
2829 // definitely not fit into MSS.
2830 TEST(AsyncSocketTest, ConnectTFOWithBigData) {
2831 // Start listening on a local port
2832 TestServer server(true);
2834 // Connect using a AsyncSocket
2836 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
2837 socket->enableTFO();
2839 socket->connect(&cb, server.getAddress(), 30);
2841 std::array<uint8_t, 128> buf;
2842 memset(buf.data(), 'a', buf.size());
2844 constexpr size_t len = 10 * 1024;
2845 auto sendBuf = IOBuf::create(len);
2846 sendBuf->append(len);
2847 std::array<uint8_t, len> readBuf;
2850 auto acceptedSocket = server.accept();
2851 acceptedSocket->write(buf.data(), buf.size());
2852 acceptedSocket->flush();
2853 acceptedSocket->readAll(readBuf.data(), readBuf.size());
2854 acceptedSocket->close();
2859 ASSERT_EQ(cb.state, STATE_SUCCEEDED);
2860 EXPECT_LE(0, socket->getConnectTime().count());
2861 EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30));
2862 EXPECT_TRUE(socket->getTFOAttempted());
2864 // Should trigger the connect
2865 WriteCallback write;
2867 socket->writeChain(&write, sendBuf->clone());
2868 socket->setReadCB(&rcb);
2873 EXPECT_EQ(STATE_SUCCEEDED, write.state);
2874 EXPECT_EQ(0, memcmp(readBuf.data(), sendBuf->data(), readBuf.size()));
2875 EXPECT_EQ(STATE_SUCCEEDED, rcb.state);
2876 ASSERT_EQ(1, rcb.buffers.size());
2877 ASSERT_EQ(sizeof(buf), rcb.buffers[0].length);
2878 EXPECT_EQ(0, memcmp(rcb.buffers[0].buffer, buf.data(), buf.size()));
2879 EXPECT_EQ(socket->getTFOFinished(), socket->getTFOSucceded());