2 * Copyright 2016 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 #include <folly/io/async/AsyncServerSocket.h>
17 #include <folly/io/async/AsyncSocket.h>
18 #include <folly/io/async/AsyncTimeout.h>
19 #include <folly/io/async/EventBase.h>
20 #include <folly/RWSpinLock.h>
21 #include <folly/SocketAddress.h>
22 #include <folly/Random.h>
24 #include <folly/io/IOBuf.h>
25 #include <folly/io/async/test/AsyncSocketTest.h>
26 #include <folly/io/async/test/Util.h>
27 #include <folly/portability/Sockets.h>
28 #include <folly/portability/Unistd.h>
29 #include <folly/test/SocketAddressTestHelper.h>
31 #include <gtest/gtest.h>
32 #include <boost/scoped_array.hpp>
35 #include <sys/types.h>
38 using namespace boost;
45 using std::unique_ptr;
46 using std::chrono::milliseconds;
47 using boost::scoped_array;
49 using namespace folly;
51 class DelayedWrite: public AsyncTimeout {
53 DelayedWrite(const std::shared_ptr<AsyncSocket>& socket,
54 unique_ptr<IOBuf>&& bufs, AsyncTransportWrapper::WriteCallback* wcb,
55 bool cork, bool lastWrite = false):
56 AsyncTimeout(socket->getEventBase()),
58 bufs_(std::move(bufs)),
61 lastWrite_(lastWrite) {}
64 void timeoutExpired() noexcept override {
65 WriteFlags flags = cork_ ? WriteFlags::CORK : WriteFlags::NONE;
66 socket_->writeChain(wcb_, std::move(bufs_), flags);
68 socket_->shutdownWrite();
72 std::shared_ptr<AsyncSocket> socket_;
73 unique_ptr<IOBuf> bufs_;
74 AsyncTransportWrapper::WriteCallback* wcb_;
79 ///////////////////////////////////////////////////////////////////////////
81 ///////////////////////////////////////////////////////////////////////////
84 * Test connecting to a server
86 TEST(AsyncSocketTest, Connect) {
87 // Start listening on a local port
90 // Connect using a AsyncSocket
92 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
94 socket->connect(&cb, server.getAddress(), 30);
98 CHECK_EQ(cb.state, STATE_SUCCEEDED);
99 EXPECT_LE(0, socket->getConnectTime().count());
100 EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30));
104 * Test connecting to a server that isn't listening
106 TEST(AsyncSocketTest, ConnectRefused) {
109 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
111 // Hopefully nothing is actually listening on this address
112 folly::SocketAddress addr("127.0.0.1", 65535);
114 socket->connect(&cb, addr, 30);
118 CHECK_EQ(cb.state, STATE_FAILED);
119 CHECK_EQ(cb.exception.getType(), AsyncSocketException::NOT_OPEN);
120 EXPECT_LE(0, socket->getConnectTime().count());
121 EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30));
125 * Test connection timeout
127 TEST(AsyncSocketTest, ConnectTimeout) {
130 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
132 // Try connecting to server that won't respond.
134 // This depends somewhat on the network where this test is run.
135 // Hopefully this IP will be routable but unresponsive.
136 // (Alternatively, we could try listening on a local raw socket, but that
137 // normally requires root privileges.)
139 SocketAddressTestHelper::isIPv6Enabled() ?
140 SocketAddressTestHelper::kGooglePublicDnsAAddrIPv6 :
141 SocketAddressTestHelper::isIPv4Enabled() ?
142 SocketAddressTestHelper::kGooglePublicDnsAAddrIPv4 :
144 SocketAddress addr(host, 65535);
146 socket->connect(&cb, addr, 1); // also set a ridiculously small timeout
150 CHECK_EQ(cb.state, STATE_FAILED);
151 CHECK_EQ(cb.exception.getType(), AsyncSocketException::TIMED_OUT);
153 // Verify that we can still get the peer address after a timeout.
154 // Use case is if the client was created from a client pool, and we want
155 // to log which peer failed.
156 folly::SocketAddress peer;
157 socket->getPeerAddress(&peer);
158 CHECK_EQ(peer, addr);
159 EXPECT_LE(0, socket->getConnectTime().count());
160 EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(1));
164 * Test writing immediately after connecting, without waiting for connect
167 TEST(AsyncSocketTest, ConnectAndWrite) {
172 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
174 socket->connect(&ccb, server.getAddress(), 30);
178 memset(buf, 'a', sizeof(buf));
180 socket->write(&wcb, buf, sizeof(buf));
182 // Loop. We don't bother accepting on the server socket yet.
183 // The kernel should be able to buffer the write request so it can succeed.
186 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
187 CHECK_EQ(wcb.state, STATE_SUCCEEDED);
189 // Make sure the server got a connection and received the data
191 server.verifyConnection(buf, sizeof(buf));
193 ASSERT_TRUE(socket->isClosedBySelf());
194 ASSERT_FALSE(socket->isClosedByPeer());
195 EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30));
199 * Test connecting using a nullptr connect callback.
201 TEST(AsyncSocketTest, ConnectNullCallback) {
206 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
207 socket->connect(nullptr, server.getAddress(), 30);
209 // write some data, just so we have some way of verifing
210 // that the socket works correctly after connecting
212 memset(buf, 'a', sizeof(buf));
214 socket->write(&wcb, buf, sizeof(buf));
218 CHECK_EQ(wcb.state, STATE_SUCCEEDED);
220 // Make sure the server got a connection and received the data
222 server.verifyConnection(buf, sizeof(buf));
224 ASSERT_TRUE(socket->isClosedBySelf());
225 ASSERT_FALSE(socket->isClosedByPeer());
229 * Test calling both write() and close() immediately after connecting, without
230 * waiting for connect to finish.
232 * This exercises the STATE_CONNECTING_CLOSING code.
234 TEST(AsyncSocketTest, ConnectWriteAndClose) {
239 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
241 socket->connect(&ccb, server.getAddress(), 30);
245 memset(buf, 'a', sizeof(buf));
247 socket->write(&wcb, buf, sizeof(buf));
252 // Loop. We don't bother accepting on the server socket yet.
253 // The kernel should be able to buffer the write request so it can succeed.
256 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
257 CHECK_EQ(wcb.state, STATE_SUCCEEDED);
259 // Make sure the server got a connection and received the data
260 server.verifyConnection(buf, sizeof(buf));
262 ASSERT_TRUE(socket->isClosedBySelf());
263 ASSERT_FALSE(socket->isClosedByPeer());
267 * Test calling close() immediately after connect()
269 TEST(AsyncSocketTest, ConnectAndClose) {
272 // Connect using a AsyncSocket
274 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
276 socket->connect(&ccb, server.getAddress(), 30);
278 // Hopefully the connect didn't succeed immediately.
279 // If it did, we can't exercise the close-while-connecting code path.
280 if (ccb.state == STATE_SUCCEEDED) {
281 LOG(INFO) << "connect() succeeded immediately; aborting test "
282 "of close-during-connect behavior";
288 // Loop, although there shouldn't be anything to do.
291 // Make sure the connection was aborted
292 CHECK_EQ(ccb.state, STATE_FAILED);
294 ASSERT_TRUE(socket->isClosedBySelf());
295 ASSERT_FALSE(socket->isClosedByPeer());
299 * Test calling closeNow() immediately after connect()
301 * This should be identical to the normal close behavior.
303 TEST(AsyncSocketTest, ConnectAndCloseNow) {
306 // Connect using a AsyncSocket
308 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
310 socket->connect(&ccb, server.getAddress(), 30);
312 // Hopefully the connect didn't succeed immediately.
313 // If it did, we can't exercise the close-while-connecting code path.
314 if (ccb.state == STATE_SUCCEEDED) {
315 LOG(INFO) << "connect() succeeded immediately; aborting test "
316 "of closeNow()-during-connect behavior";
322 // Loop, although there shouldn't be anything to do.
325 // Make sure the connection was aborted
326 CHECK_EQ(ccb.state, STATE_FAILED);
328 ASSERT_TRUE(socket->isClosedBySelf());
329 ASSERT_FALSE(socket->isClosedByPeer());
333 * Test calling both write() and closeNow() immediately after connecting,
334 * without waiting for connect to finish.
336 * This should abort the pending write.
338 TEST(AsyncSocketTest, ConnectWriteAndCloseNow) {
343 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
345 socket->connect(&ccb, server.getAddress(), 30);
347 // Hopefully the connect didn't succeed immediately.
348 // If it did, we can't exercise the close-while-connecting code path.
349 if (ccb.state == STATE_SUCCEEDED) {
350 LOG(INFO) << "connect() succeeded immediately; aborting test "
351 "of write-during-connect behavior";
357 memset(buf, 'a', sizeof(buf));
359 socket->write(&wcb, buf, sizeof(buf));
364 // Loop, although there shouldn't be anything to do.
367 CHECK_EQ(ccb.state, STATE_FAILED);
368 CHECK_EQ(wcb.state, STATE_FAILED);
370 ASSERT_TRUE(socket->isClosedBySelf());
371 ASSERT_FALSE(socket->isClosedByPeer());
375 * Test installing a read callback immediately, before connect() finishes.
377 TEST(AsyncSocketTest, ConnectAndRead) {
382 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
384 socket->connect(&ccb, server.getAddress(), 30);
387 socket->setReadCB(&rcb);
389 // Even though we haven't looped yet, we should be able to accept
390 // the connection and send data to it.
391 std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
393 memset(buf, 'a', sizeof(buf));
394 acceptedSocket->write(buf, sizeof(buf));
395 acceptedSocket->flush();
396 acceptedSocket->close();
398 // Loop, although there shouldn't be anything to do.
401 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
402 CHECK_EQ(rcb.state, STATE_SUCCEEDED);
403 CHECK_EQ(rcb.buffers.size(), 1);
404 CHECK_EQ(rcb.buffers[0].length, sizeof(buf));
405 CHECK_EQ(memcmp(rcb.buffers[0].buffer, buf, sizeof(buf)), 0);
407 ASSERT_FALSE(socket->isClosedBySelf());
408 ASSERT_FALSE(socket->isClosedByPeer());
412 * Test installing a read callback and then closing immediately before the
413 * connect attempt finishes.
415 TEST(AsyncSocketTest, ConnectReadAndClose) {
420 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
422 socket->connect(&ccb, server.getAddress(), 30);
424 // Hopefully the connect didn't succeed immediately.
425 // If it did, we can't exercise the close-while-connecting code path.
426 if (ccb.state == STATE_SUCCEEDED) {
427 LOG(INFO) << "connect() succeeded immediately; aborting test "
428 "of read-during-connect behavior";
433 socket->setReadCB(&rcb);
438 // Loop, although there shouldn't be anything to do.
441 CHECK_EQ(ccb.state, STATE_FAILED); // we aborted the close attempt
442 CHECK_EQ(rcb.buffers.size(), 0);
443 CHECK_EQ(rcb.state, STATE_SUCCEEDED); // this indicates EOF
445 ASSERT_TRUE(socket->isClosedBySelf());
446 ASSERT_FALSE(socket->isClosedByPeer());
450 * Test both writing and installing a read callback immediately,
451 * before connect() finishes.
453 TEST(AsyncSocketTest, ConnectWriteAndRead) {
458 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
460 socket->connect(&ccb, server.getAddress(), 30);
464 memset(buf1, 'a', sizeof(buf1));
466 socket->write(&wcb, buf1, sizeof(buf1));
468 // set a read callback
470 socket->setReadCB(&rcb);
472 // Even though we haven't looped yet, we should be able to accept
473 // the connection and send data to it.
474 std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
476 memset(buf2, 'b', sizeof(buf2));
477 acceptedSocket->write(buf2, sizeof(buf2));
478 acceptedSocket->flush();
480 // shut down the write half of acceptedSocket, so that the AsyncSocket
481 // will stop reading and we can break out of the event loop.
482 shutdown(acceptedSocket->getSocketFD(), SHUT_WR);
487 // Make sure the connect succeeded
488 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
490 // Make sure the AsyncSocket read the data written by the accepted socket
491 CHECK_EQ(rcb.state, STATE_SUCCEEDED);
492 CHECK_EQ(rcb.buffers.size(), 1);
493 CHECK_EQ(rcb.buffers[0].length, sizeof(buf2));
494 CHECK_EQ(memcmp(rcb.buffers[0].buffer, buf2, sizeof(buf2)), 0);
496 // Close the AsyncSocket so we'll see EOF on acceptedSocket
499 // Make sure the accepted socket saw the data written by the AsyncSocket
500 uint8_t readbuf[sizeof(buf1)];
501 acceptedSocket->readAll(readbuf, sizeof(readbuf));
502 CHECK_EQ(memcmp(buf1, readbuf, sizeof(buf1)), 0);
503 uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
504 CHECK_EQ(bytesRead, 0);
506 ASSERT_FALSE(socket->isClosedBySelf());
507 ASSERT_TRUE(socket->isClosedByPeer());
511 * Test writing to the socket then shutting down writes before the connect
514 TEST(AsyncSocketTest, ConnectWriteAndShutdownWrite) {
519 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
521 socket->connect(&ccb, server.getAddress(), 30);
523 // Hopefully the connect didn't succeed immediately.
524 // If it did, we can't exercise the write-while-connecting code path.
525 if (ccb.state == STATE_SUCCEEDED) {
526 LOG(INFO) << "connect() succeeded immediately; skipping test";
530 // Ask to write some data
532 memset(wbuf, 'a', sizeof(wbuf));
534 socket->write(&wcb, wbuf, sizeof(wbuf));
535 socket->shutdownWrite();
538 socket->shutdownWrite();
540 // Even though we haven't looped yet, we should be able to accept
542 std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
544 // Since the connection is still in progress, there should be no data to
545 // read yet. Verify that the accepted socket is not readable.
546 struct pollfd fds[1];
547 fds[0].fd = acceptedSocket->getSocketFD();
548 fds[0].events = POLLIN;
550 int rc = poll(fds, 1, 0);
553 // Write data to the accepted socket
554 uint8_t acceptedWbuf[192];
555 memset(acceptedWbuf, 'b', sizeof(acceptedWbuf));
556 acceptedSocket->write(acceptedWbuf, sizeof(acceptedWbuf));
557 acceptedSocket->flush();
562 // The loop should have completed the connection, written the queued data,
563 // and shutdown writes on the socket.
565 // Check that the connection was completed successfully and that the write
566 // callback succeeded.
567 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
568 CHECK_EQ(wcb.state, STATE_SUCCEEDED);
570 // Check that we can read the data that was written to the socket, and that
571 // we see an EOF, since its socket was half-shutdown.
572 uint8_t readbuf[sizeof(wbuf)];
573 acceptedSocket->readAll(readbuf, sizeof(readbuf));
574 CHECK_EQ(memcmp(wbuf, readbuf, sizeof(wbuf)), 0);
575 uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
576 CHECK_EQ(bytesRead, 0);
578 // Close the accepted socket. This will cause it to see EOF
579 // and uninstall the read callback when we loop next.
580 acceptedSocket->close();
582 // Install a read callback, then loop again.
584 socket->setReadCB(&rcb);
587 // This loop should have read the data and seen the EOF
588 CHECK_EQ(rcb.state, STATE_SUCCEEDED);
589 CHECK_EQ(rcb.buffers.size(), 1);
590 CHECK_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf));
591 CHECK_EQ(memcmp(rcb.buffers[0].buffer,
592 acceptedWbuf, sizeof(acceptedWbuf)), 0);
594 ASSERT_FALSE(socket->isClosedBySelf());
595 ASSERT_FALSE(socket->isClosedByPeer());
599 * Test reading, writing, and shutting down writes before the connect attempt
602 TEST(AsyncSocketTest, ConnectReadWriteAndShutdownWrite) {
607 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
609 socket->connect(&ccb, server.getAddress(), 30);
611 // Hopefully the connect didn't succeed immediately.
612 // If it did, we can't exercise the write-while-connecting code path.
613 if (ccb.state == STATE_SUCCEEDED) {
614 LOG(INFO) << "connect() succeeded immediately; skipping test";
618 // Install a read callback
620 socket->setReadCB(&rcb);
622 // Ask to write some data
624 memset(wbuf, 'a', sizeof(wbuf));
626 socket->write(&wcb, wbuf, sizeof(wbuf));
629 socket->shutdownWrite();
631 // Even though we haven't looped yet, we should be able to accept
633 std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
635 // Since the connection is still in progress, there should be no data to
636 // read yet. Verify that the accepted socket is not readable.
637 struct pollfd fds[1];
638 fds[0].fd = acceptedSocket->getSocketFD();
639 fds[0].events = POLLIN;
641 int rc = poll(fds, 1, 0);
644 // Write data to the accepted socket
645 uint8_t acceptedWbuf[192];
646 memset(acceptedWbuf, 'b', sizeof(acceptedWbuf));
647 acceptedSocket->write(acceptedWbuf, sizeof(acceptedWbuf));
648 acceptedSocket->flush();
649 // Shutdown writes to the accepted socket. This will cause it to see EOF
650 // and uninstall the read callback.
651 ::shutdown(acceptedSocket->getSocketFD(), SHUT_WR);
656 // The loop should have completed the connection, written the queued data,
657 // shutdown writes on the socket, read the data we wrote to it, and see the
660 // Check that the connection was completed successfully and that the read
661 // and write callbacks were invoked as expected.
662 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
663 CHECK_EQ(rcb.state, STATE_SUCCEEDED);
664 CHECK_EQ(rcb.buffers.size(), 1);
665 CHECK_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf));
666 CHECK_EQ(memcmp(rcb.buffers[0].buffer,
667 acceptedWbuf, sizeof(acceptedWbuf)), 0);
668 CHECK_EQ(wcb.state, STATE_SUCCEEDED);
670 // Check that we can read the data that was written to the socket, and that
671 // we see an EOF, since its socket was half-shutdown.
672 uint8_t readbuf[sizeof(wbuf)];
673 acceptedSocket->readAll(readbuf, sizeof(readbuf));
674 CHECK_EQ(memcmp(wbuf, readbuf, sizeof(wbuf)), 0);
675 uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
676 CHECK_EQ(bytesRead, 0);
678 // Fully close both sockets
679 acceptedSocket->close();
682 ASSERT_FALSE(socket->isClosedBySelf());
683 ASSERT_TRUE(socket->isClosedByPeer());
687 * Test reading, writing, and calling shutdownWriteNow() before the
688 * connect attempt finishes.
690 TEST(AsyncSocketTest, ConnectReadWriteAndShutdownWriteNow) {
695 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
697 socket->connect(&ccb, server.getAddress(), 30);
699 // Hopefully the connect didn't succeed immediately.
700 // If it did, we can't exercise the write-while-connecting code path.
701 if (ccb.state == STATE_SUCCEEDED) {
702 LOG(INFO) << "connect() succeeded immediately; skipping test";
706 // Install a read callback
708 socket->setReadCB(&rcb);
710 // Ask to write some data
712 memset(wbuf, 'a', sizeof(wbuf));
714 socket->write(&wcb, wbuf, sizeof(wbuf));
716 // Shutdown writes immediately.
717 // This should immediately discard the data that we just tried to write.
718 socket->shutdownWriteNow();
720 // Verify that writeError() was invoked on the write callback.
721 CHECK_EQ(wcb.state, STATE_FAILED);
722 CHECK_EQ(wcb.bytesWritten, 0);
724 // Even though we haven't looped yet, we should be able to accept
726 std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
728 // Since the connection is still in progress, there should be no data to
729 // read yet. Verify that the accepted socket is not readable.
730 struct pollfd fds[1];
731 fds[0].fd = acceptedSocket->getSocketFD();
732 fds[0].events = POLLIN;
734 int rc = poll(fds, 1, 0);
737 // Write data to the accepted socket
738 uint8_t acceptedWbuf[192];
739 memset(acceptedWbuf, 'b', sizeof(acceptedWbuf));
740 acceptedSocket->write(acceptedWbuf, sizeof(acceptedWbuf));
741 acceptedSocket->flush();
742 // Shutdown writes to the accepted socket. This will cause it to see EOF
743 // and uninstall the read callback.
744 ::shutdown(acceptedSocket->getSocketFD(), SHUT_WR);
749 // The loop should have completed the connection, written the queued data,
750 // shutdown writes on the socket, read the data we wrote to it, and see the
753 // Check that the connection was completed successfully and that the read
754 // callback was invoked as expected.
755 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
756 CHECK_EQ(rcb.state, STATE_SUCCEEDED);
757 CHECK_EQ(rcb.buffers.size(), 1);
758 CHECK_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf));
759 CHECK_EQ(memcmp(rcb.buffers[0].buffer,
760 acceptedWbuf, sizeof(acceptedWbuf)), 0);
762 // Since we used shutdownWriteNow(), it should have discarded all pending
763 // write data. Verify we see an immediate EOF when reading from the accepted
765 uint8_t readbuf[sizeof(wbuf)];
766 uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
767 CHECK_EQ(bytesRead, 0);
769 // Fully close both sockets
770 acceptedSocket->close();
773 ASSERT_FALSE(socket->isClosedBySelf());
774 ASSERT_TRUE(socket->isClosedByPeer());
777 // Helper function for use in testConnectOptWrite()
778 // Temporarily disable the read callback
779 void tmpDisableReads(AsyncSocket* socket, ReadCallback* rcb) {
780 // Uninstall the read callback
781 socket->setReadCB(nullptr);
782 // Schedule the read callback to be reinstalled after 1ms
783 socket->getEventBase()->runInLoop(
784 std::bind(&AsyncSocket::setReadCB, socket, rcb));
788 * Test connect+write, then have the connect callback perform another write.
790 * This tests interaction of the optimistic writing after connect with
791 * additional write attempts that occur in the connect callback.
793 void testConnectOptWrite(size_t size1, size_t size2, bool close = false) {
796 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
800 socket->connect(&ccb, server.getAddress(), 30);
802 // Hopefully the connect didn't succeed immediately.
803 // If it did, we can't exercise the optimistic write code path.
804 if (ccb.state == STATE_SUCCEEDED) {
805 LOG(INFO) << "connect() succeeded immediately; aborting test "
806 "of optimistic write behavior";
810 // Tell the connect callback to perform a write when the connect succeeds
812 scoped_array<char> buf2(new char[size2]);
813 memset(buf2.get(), 'b', size2);
815 ccb.successCallback = [&] { socket->write(&wcb2, buf2.get(), size2); };
816 // Tell the second write callback to close the connection when it is done
817 wcb2.successCallback = [&] { socket->closeNow(); };
820 // Schedule one write() immediately, before the connect finishes
821 scoped_array<char> buf1(new char[size1]);
822 memset(buf1.get(), 'a', size1);
825 socket->write(&wcb1, buf1.get(), size1);
829 // immediately perform a close, before connect() completes
833 // Start reading from the other endpoint after 10ms.
834 // If we're using large buffers, we have to read so that the writes don't
836 std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&evb);
838 rcb.dataAvailableCallback = std::bind(tmpDisableReads,
839 acceptedSocket.get(), &rcb);
840 socket->getEventBase()->tryRunAfterDelay(
841 std::bind(&AsyncSocket::setReadCB, acceptedSocket.get(), &rcb),
844 // Loop. We don't bother accepting on the server socket yet.
845 // The kernel should be able to buffer the write request so it can succeed.
848 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
850 CHECK_EQ(wcb1.state, STATE_SUCCEEDED);
853 CHECK_EQ(wcb2.state, STATE_SUCCEEDED);
858 // Make sure the read callback received all of the data
859 size_t bytesRead = 0;
860 for (vector<ReadCallback::Buffer>::const_iterator it = rcb.buffers.begin();
861 it != rcb.buffers.end();
863 size_t start = bytesRead;
864 bytesRead += it->length;
865 size_t end = bytesRead;
867 size_t cmpLen = min(size1, end) - start;
868 CHECK_EQ(memcmp(it->buffer, buf1.get() + start, cmpLen), 0);
870 if (end > size1 && end <= size1 + size2) {
874 if (start >= size1) {
876 buf2Offset = start - size1;
877 cmpLen = end - start;
879 itOffset = size1 - start;
881 cmpLen = end - size1;
883 CHECK_EQ(memcmp(it->buffer + itOffset, buf2.get() + buf2Offset,
888 CHECK_EQ(bytesRead, size1 + size2);
891 TEST(AsyncSocketTest, ConnectCallbackWrite) {
892 // Test using small writes that should both succeed immediately
893 testConnectOptWrite(100, 200);
895 // Test using a large buffer in the connect callback, that should block
896 const size_t largeSize = 8*1024*1024;
897 testConnectOptWrite(100, largeSize);
899 // Test using a large initial write
900 testConnectOptWrite(largeSize, 100);
902 // Test using two large buffers
903 testConnectOptWrite(largeSize, largeSize);
905 // Test a small write in the connect callback,
906 // but no immediate write before connect completes
907 testConnectOptWrite(0, 64);
909 // Test a large write in the connect callback,
910 // but no immediate write before connect completes
911 testConnectOptWrite(0, largeSize);
913 // Test connect, a small write, then immediately call close() before connect
915 testConnectOptWrite(211, 0, true);
917 // Test connect, a large immediate write (that will block), then immediately
918 // call close() before connect completes
919 testConnectOptWrite(largeSize, 0, true);
922 ///////////////////////////////////////////////////////////////////////////
923 // write() related tests
924 ///////////////////////////////////////////////////////////////////////////
927 * Test writing using a nullptr callback
929 TEST(AsyncSocketTest, WriteNullCallback) {
934 std::shared_ptr<AsyncSocket> socket =
935 AsyncSocket::newSocket(&evb, server.getAddress(), 30);
936 evb.loop(); // loop until the socket is connected
938 // write() with a nullptr callback
940 memset(buf, 'a', sizeof(buf));
941 socket->write(nullptr, buf, sizeof(buf));
943 evb.loop(); // loop until the data is sent
945 // Make sure the server got a connection and received the data
947 server.verifyConnection(buf, sizeof(buf));
949 ASSERT_TRUE(socket->isClosedBySelf());
950 ASSERT_FALSE(socket->isClosedByPeer());
954 * Test writing with a send timeout
956 TEST(AsyncSocketTest, WriteTimeout) {
961 std::shared_ptr<AsyncSocket> socket =
962 AsyncSocket::newSocket(&evb, server.getAddress(), 30);
963 evb.loop(); // loop until the socket is connected
965 // write() a large chunk of data, with no-one on the other end reading
966 size_t writeLength = 8*1024*1024;
967 uint32_t timeout = 200;
968 socket->setSendTimeout(timeout);
969 scoped_array<char> buf(new char[writeLength]);
970 memset(buf.get(), 'a', writeLength);
972 socket->write(&wcb, buf.get(), writeLength);
978 // Make sure the write attempt timed out as requested
979 CHECK_EQ(wcb.state, STATE_FAILED);
980 CHECK_EQ(wcb.exception.getType(), AsyncSocketException::TIMED_OUT);
982 // Check that the write timed out within a reasonable period of time.
983 // We don't check for exactly the specified timeout, since AsyncSocket only
984 // times out when it hasn't made progress for that period of time.
986 // On linux, the first write sends a few hundred kb of data, then blocks for
987 // writability, and then unblocks again after 40ms and is able to write
988 // another smaller of data before blocking permanently. Therefore it doesn't
989 // time out until 40ms + timeout.
991 // I haven't fully verified the cause of this, but I believe it probably
992 // occurs because the receiving end delays sending an ack for up to 40ms.
993 // (This is the default value for TCP_DELACK_MIN.) Once the sender receives
994 // the ack, it can send some more data. However, after that point the
995 // receiver's kernel buffer is full. This 40ms delay happens even with
996 // TCP_NODELAY and TCP_QUICKACK enabled on both endpoints. However, the
997 // kernel may be automatically disabling TCP_QUICKACK after receiving some
1000 // For now, we simply check that the timeout occurred within 160ms of
1001 // the requested value.
1002 T_CHECK_TIMEOUT(start, end, milliseconds(timeout), milliseconds(160));
1006 * Test writing to a socket that the remote endpoint has closed
1008 TEST(AsyncSocketTest, WritePipeError) {
1013 std::shared_ptr<AsyncSocket> socket =
1014 AsyncSocket::newSocket(&evb, server.getAddress(), 30);
1015 socket->setSendTimeout(1000);
1016 evb.loop(); // loop until the socket is connected
1018 // accept and immediately close the socket
1019 std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
1020 acceptedSocket.reset();
1022 // write() a large chunk of data
1023 size_t writeLength = 8*1024*1024;
1024 scoped_array<char> buf(new char[writeLength]);
1025 memset(buf.get(), 'a', writeLength);
1027 socket->write(&wcb, buf.get(), writeLength);
1031 // Make sure the write failed.
1032 // It would be nice if AsyncSocketException could convey the errno value,
1033 // so that we could check for EPIPE
1034 CHECK_EQ(wcb.state, STATE_FAILED);
1035 CHECK_EQ(wcb.exception.getType(),
1036 AsyncSocketException::INTERNAL_ERROR);
1038 ASSERT_FALSE(socket->isClosedBySelf());
1039 ASSERT_FALSE(socket->isClosedByPeer());
1043 * Test writing a mix of simple buffers and IOBufs
1045 TEST(AsyncSocketTest, WriteIOBuf) {
1050 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
1052 socket->connect(&ccb, server.getAddress(), 30);
1054 // Accept the connection
1055 std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&evb);
1057 acceptedSocket->setReadCB(&rcb);
1059 // Write a simple buffer to the socket
1060 size_t simpleBufLength = 5;
1061 char simpleBuf[simpleBufLength];
1062 memset(simpleBuf, 'a', simpleBufLength);
1064 socket->write(&wcb, simpleBuf, simpleBufLength);
1066 // Write a single-element IOBuf chain
1067 size_t buf1Length = 7;
1068 unique_ptr<IOBuf> buf1(IOBuf::create(buf1Length));
1069 memset(buf1->writableData(), 'b', buf1Length);
1070 buf1->append(buf1Length);
1071 unique_ptr<IOBuf> buf1Copy(buf1->clone());
1073 socket->writeChain(&wcb2, std::move(buf1));
1075 // Write a multiple-element IOBuf chain
1076 size_t buf2Length = 11;
1077 unique_ptr<IOBuf> buf2(IOBuf::create(buf2Length));
1078 memset(buf2->writableData(), 'c', buf2Length);
1079 buf2->append(buf2Length);
1080 size_t buf3Length = 13;
1081 unique_ptr<IOBuf> buf3(IOBuf::create(buf3Length));
1082 memset(buf3->writableData(), 'd', buf3Length);
1083 buf3->append(buf3Length);
1084 buf2->appendChain(std::move(buf3));
1085 unique_ptr<IOBuf> buf2Copy(buf2->clone());
1086 buf2Copy->coalesce();
1088 socket->writeChain(&wcb3, std::move(buf2));
1089 socket->shutdownWrite();
1091 // Let the reads and writes run to completion
1094 CHECK_EQ(wcb.state, STATE_SUCCEEDED);
1095 CHECK_EQ(wcb2.state, STATE_SUCCEEDED);
1096 CHECK_EQ(wcb3.state, STATE_SUCCEEDED);
1098 // Make sure the reader got the right data in the right order
1099 CHECK_EQ(rcb.state, STATE_SUCCEEDED);
1100 CHECK_EQ(rcb.buffers.size(), 1);
1101 CHECK_EQ(rcb.buffers[0].length,
1102 simpleBufLength + buf1Length + buf2Length + buf3Length);
1104 memcmp(rcb.buffers[0].buffer, simpleBuf, simpleBufLength), 0);
1106 memcmp(rcb.buffers[0].buffer + simpleBufLength,
1107 buf1Copy->data(), buf1Copy->length()), 0);
1109 memcmp(rcb.buffers[0].buffer + simpleBufLength + buf1Length,
1110 buf2Copy->data(), buf2Copy->length()), 0);
1112 acceptedSocket->close();
1115 ASSERT_TRUE(socket->isClosedBySelf());
1116 ASSERT_FALSE(socket->isClosedByPeer());
1119 TEST(AsyncSocketTest, WriteIOBufCorked) {
1124 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
1126 socket->connect(&ccb, server.getAddress(), 30);
1128 // Accept the connection
1129 std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&evb);
1131 acceptedSocket->setReadCB(&rcb);
1133 // Do three writes, 100ms apart, with the "cork" flag set
1134 // on the second write. The reader should see the first write
1135 // arrive by itself, followed by the second and third writes
1136 // arriving together.
1137 size_t buf1Length = 5;
1138 unique_ptr<IOBuf> buf1(IOBuf::create(buf1Length));
1139 memset(buf1->writableData(), 'a', buf1Length);
1140 buf1->append(buf1Length);
1141 size_t buf2Length = 7;
1142 unique_ptr<IOBuf> buf2(IOBuf::create(buf2Length));
1143 memset(buf2->writableData(), 'b', buf2Length);
1144 buf2->append(buf2Length);
1145 size_t buf3Length = 11;
1146 unique_ptr<IOBuf> buf3(IOBuf::create(buf3Length));
1147 memset(buf3->writableData(), 'c', buf3Length);
1148 buf3->append(buf3Length);
1150 socket->writeChain(&wcb1, std::move(buf1));
1152 DelayedWrite write2(socket, std::move(buf2), &wcb2, true);
1153 write2.scheduleTimeout(100);
1155 DelayedWrite write3(socket, std::move(buf3), &wcb3, false, true);
1156 write3.scheduleTimeout(140);
1159 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
1160 CHECK_EQ(wcb1.state, STATE_SUCCEEDED);
1161 CHECK_EQ(wcb2.state, STATE_SUCCEEDED);
1162 if (wcb3.state != STATE_SUCCEEDED) {
1163 throw(wcb3.exception);
1165 CHECK_EQ(wcb3.state, STATE_SUCCEEDED);
1167 // Make sure the reader got the data with the right grouping
1168 CHECK_EQ(rcb.state, STATE_SUCCEEDED);
1169 CHECK_EQ(rcb.buffers.size(), 2);
1170 CHECK_EQ(rcb.buffers[0].length, buf1Length);
1171 CHECK_EQ(rcb.buffers[1].length, buf2Length + buf3Length);
1173 acceptedSocket->close();
1176 ASSERT_TRUE(socket->isClosedBySelf());
1177 ASSERT_FALSE(socket->isClosedByPeer());
1181 * Test performing a zero-length write
1183 TEST(AsyncSocketTest, ZeroLengthWrite) {
1188 std::shared_ptr<AsyncSocket> socket =
1189 AsyncSocket::newSocket(&evb, server.getAddress(), 30);
1190 evb.loop(); // loop until the socket is connected
1192 auto acceptedSocket = server.acceptAsync(&evb);
1194 acceptedSocket->setReadCB(&rcb);
1196 size_t len1 = 1024*1024;
1197 size_t len2 = 1024*1024;
1198 std::unique_ptr<char[]> buf(new char[len1 + len2]);
1199 memset(buf.get(), 'a', len1);
1200 memset(buf.get(), 'b', len2);
1206 socket->write(&wcb1, buf.get(), 0);
1207 socket->write(&wcb2, buf.get(), len1);
1208 socket->write(&wcb3, buf.get() + len1, 0);
1209 socket->write(&wcb4, buf.get() + len1, len2);
1212 evb.loop(); // loop until the data is sent
1214 CHECK_EQ(wcb1.state, STATE_SUCCEEDED);
1215 CHECK_EQ(wcb2.state, STATE_SUCCEEDED);
1216 CHECK_EQ(wcb3.state, STATE_SUCCEEDED);
1217 CHECK_EQ(wcb4.state, STATE_SUCCEEDED);
1218 rcb.verifyData(buf.get(), len1 + len2);
1220 ASSERT_TRUE(socket->isClosedBySelf());
1221 ASSERT_FALSE(socket->isClosedByPeer());
1224 TEST(AsyncSocketTest, ZeroLengthWritev) {
1229 std::shared_ptr<AsyncSocket> socket =
1230 AsyncSocket::newSocket(&evb, server.getAddress(), 30);
1231 evb.loop(); // loop until the socket is connected
1233 auto acceptedSocket = server.acceptAsync(&evb);
1235 acceptedSocket->setReadCB(&rcb);
1237 size_t len1 = 1024*1024;
1238 size_t len2 = 1024*1024;
1239 std::unique_ptr<char[]> buf(new char[len1 + len2]);
1240 memset(buf.get(), 'a', len1);
1241 memset(buf.get(), 'b', len2);
1244 size_t iovCount = 4;
1245 struct iovec iov[iovCount];
1246 iov[0].iov_base = buf.get();
1247 iov[0].iov_len = len1;
1248 iov[1].iov_base = buf.get() + len1;
1250 iov[2].iov_base = buf.get() + len1;
1251 iov[2].iov_len = len2;
1252 iov[3].iov_base = buf.get() + len1 + len2;
1255 socket->writev(&wcb, iov, iovCount);
1257 evb.loop(); // loop until the data is sent
1259 CHECK_EQ(wcb.state, STATE_SUCCEEDED);
1260 rcb.verifyData(buf.get(), len1 + len2);
1262 ASSERT_TRUE(socket->isClosedBySelf());
1263 ASSERT_FALSE(socket->isClosedByPeer());
1266 ///////////////////////////////////////////////////////////////////////////
1267 // close() related tests
1268 ///////////////////////////////////////////////////////////////////////////
1271 * Test calling close() with pending writes when the socket is already closing.
1273 TEST(AsyncSocketTest, ClosePendingWritesWhileClosing) {
1278 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
1280 socket->connect(&ccb, server.getAddress(), 30);
1282 // accept the socket on the server side
1283 std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
1285 // Loop to ensure the connect has completed
1288 // Make sure we are connected
1289 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
1291 // Schedule pending writes, until several write attempts have blocked
1293 memset(buf, 'a', sizeof(buf));
1294 typedef vector< std::shared_ptr<WriteCallback> > WriteCallbackVector;
1295 WriteCallbackVector writeCallbacks;
1297 writeCallbacks.reserve(5);
1298 while (writeCallbacks.size() < 5) {
1299 std::shared_ptr<WriteCallback> wcb(new WriteCallback);
1301 socket->write(wcb.get(), buf, sizeof(buf));
1302 if (wcb->state == STATE_SUCCEEDED) {
1303 // Succeeded immediately. Keep performing more writes
1307 // This write is blocked.
1308 // Have the write callback call close() when writeError() is invoked
1309 wcb->errorCallback = std::bind(&AsyncSocket::close, socket.get());
1310 writeCallbacks.push_back(wcb);
1313 // Call closeNow() to immediately fail the pending writes
1316 // Make sure writeError() was invoked on all of the pending write callbacks
1317 for (WriteCallbackVector::const_iterator it = writeCallbacks.begin();
1318 it != writeCallbacks.end();
1320 CHECK_EQ((*it)->state, STATE_FAILED);
1323 ASSERT_TRUE(socket->isClosedBySelf());
1324 ASSERT_FALSE(socket->isClosedByPeer());
1327 ///////////////////////////////////////////////////////////////////////////
1328 // ImmediateRead related tests
1329 ///////////////////////////////////////////////////////////////////////////
1331 /* AsyncSocket use to verify immediate read works */
1332 class AsyncSocketImmediateRead : public folly::AsyncSocket {
1334 bool immediateReadCalled = false;
1335 explicit AsyncSocketImmediateRead(folly::EventBase* evb) : AsyncSocket(evb) {}
1337 void checkForImmediateRead() noexcept override {
1338 immediateReadCalled = true;
1339 AsyncSocket::handleRead();
1343 TEST(AsyncSocket, ConnectReadImmediateRead) {
1346 const size_t maxBufferSz = 100;
1347 const size_t maxReadsPerEvent = 1;
1348 const size_t expectedDataSz = maxBufferSz * 3;
1349 char expectedData[expectedDataSz];
1350 memset(expectedData, 'j', expectedDataSz);
1353 ReadCallback rcb(maxBufferSz);
1354 AsyncSocketImmediateRead socket(&evb);
1355 socket.connect(nullptr, server.getAddress(), 30);
1357 evb.loop(); // loop until the socket is connected
1359 socket.setReadCB(&rcb);
1360 socket.setMaxReadsPerEvent(maxReadsPerEvent);
1361 socket.immediateReadCalled = false;
1363 auto acceptedSocket = server.acceptAsync(&evb);
1365 ReadCallback rcbServer;
1366 WriteCallback wcbServer;
1367 rcbServer.dataAvailableCallback = [&]() {
1368 if (rcbServer.dataRead() == expectedDataSz) {
1369 // write back all data read
1370 rcbServer.verifyData(expectedData, expectedDataSz);
1371 acceptedSocket->write(&wcbServer, expectedData, expectedDataSz);
1372 acceptedSocket->close();
1375 acceptedSocket->setReadCB(&rcbServer);
1379 socket.write(&wcb1, expectedData, expectedDataSz);
1381 CHECK_EQ(wcb1.state, STATE_SUCCEEDED);
1382 rcb.verifyData(expectedData, expectedDataSz);
1383 CHECK_EQ(socket.immediateReadCalled, true);
1385 ASSERT_FALSE(socket.isClosedBySelf());
1386 ASSERT_FALSE(socket.isClosedByPeer());
1389 TEST(AsyncSocket, ConnectReadUninstallRead) {
1392 const size_t maxBufferSz = 100;
1393 const size_t maxReadsPerEvent = 1;
1394 const size_t expectedDataSz = maxBufferSz * 3;
1395 char expectedData[expectedDataSz];
1396 memset(expectedData, 'k', expectedDataSz);
1399 ReadCallback rcb(maxBufferSz);
1400 AsyncSocketImmediateRead socket(&evb);
1401 socket.connect(nullptr, server.getAddress(), 30);
1403 evb.loop(); // loop until the socket is connected
1405 socket.setReadCB(&rcb);
1406 socket.setMaxReadsPerEvent(maxReadsPerEvent);
1407 socket.immediateReadCalled = false;
1409 auto acceptedSocket = server.acceptAsync(&evb);
1411 ReadCallback rcbServer;
1412 WriteCallback wcbServer;
1413 rcbServer.dataAvailableCallback = [&]() {
1414 if (rcbServer.dataRead() == expectedDataSz) {
1415 // write back all data read
1416 rcbServer.verifyData(expectedData, expectedDataSz);
1417 acceptedSocket->write(&wcbServer, expectedData, expectedDataSz);
1418 acceptedSocket->close();
1421 acceptedSocket->setReadCB(&rcbServer);
1423 rcb.dataAvailableCallback = [&]() {
1424 // we read data and reset readCB
1425 socket.setReadCB(nullptr);
1430 socket.write(&wcb, expectedData, expectedDataSz);
1432 CHECK_EQ(wcb.state, STATE_SUCCEEDED);
1434 /* we shoud've only read maxBufferSz data since readCallback_
1435 * was reset in dataAvailableCallback */
1436 CHECK_EQ(rcb.dataRead(), maxBufferSz);
1437 CHECK_EQ(socket.immediateReadCalled, false);
1439 ASSERT_FALSE(socket.isClosedBySelf());
1440 ASSERT_FALSE(socket.isClosedByPeer());
1444 // - Test connect() and have the connect callback set the read callback
1445 // - Test connect() and have the connect callback unset the read callback
1446 // - Test reading/writing/closing/destroying the socket in the connect callback
1447 // - Test reading/writing/closing/destroying the socket in the read callback
1448 // - Test reading/writing/closing/destroying the socket in the write callback
1449 // - Test one-way shutdown behavior
1450 // - Test changing the EventBase
1452 // - TODO: test multiple threads sharing a AsyncSocket, and detaching from it
1453 // in connectSuccess(), readDataAvailable(), writeSuccess()
1456 ///////////////////////////////////////////////////////////////////////////
1457 // AsyncServerSocket tests
1458 ///////////////////////////////////////////////////////////////////////////
1461 * Helper ConnectionEventCallback class for the test code.
1462 * It maintains counters protected by a spin lock.
1464 class TestConnectionEventCallback :
1465 public AsyncServerSocket::ConnectionEventCallback {
1467 virtual void onConnectionAccepted(
1468 const int /* socket */,
1469 const SocketAddress& /* addr */) noexcept override {
1470 folly::RWSpinLock::WriteHolder holder(spinLock_);
1471 connectionAccepted_++;
1474 virtual void onConnectionAcceptError(const int /* err */) noexcept override {
1475 folly::RWSpinLock::WriteHolder holder(spinLock_);
1476 connectionAcceptedError_++;
1479 virtual void onConnectionDropped(
1480 const int /* socket */,
1481 const SocketAddress& /* addr */) noexcept override {
1482 folly::RWSpinLock::WriteHolder holder(spinLock_);
1483 connectionDropped_++;
1486 virtual void onConnectionEnqueuedForAcceptorCallback(
1487 const int /* socket */,
1488 const SocketAddress& /* addr */) noexcept override {
1489 folly::RWSpinLock::WriteHolder holder(spinLock_);
1490 connectionEnqueuedForAcceptCallback_++;
1493 virtual void onConnectionDequeuedByAcceptorCallback(
1494 const int /* socket */,
1495 const SocketAddress& /* addr */) noexcept override {
1496 folly::RWSpinLock::WriteHolder holder(spinLock_);
1497 connectionDequeuedByAcceptCallback_++;
1500 virtual void onBackoffStarted() noexcept override {
1501 folly::RWSpinLock::WriteHolder holder(spinLock_);
1505 virtual void onBackoffEnded() noexcept override {
1506 folly::RWSpinLock::WriteHolder holder(spinLock_);
1510 virtual void onBackoffError() noexcept override {
1511 folly::RWSpinLock::WriteHolder holder(spinLock_);
1515 unsigned int getConnectionAccepted() const {
1516 folly::RWSpinLock::ReadHolder holder(spinLock_);
1517 return connectionAccepted_;
1520 unsigned int getConnectionAcceptedError() const {
1521 folly::RWSpinLock::ReadHolder holder(spinLock_);
1522 return connectionAcceptedError_;
1525 unsigned int getConnectionDropped() const {
1526 folly::RWSpinLock::ReadHolder holder(spinLock_);
1527 return connectionDropped_;
1530 unsigned int getConnectionEnqueuedForAcceptCallback() const {
1531 folly::RWSpinLock::ReadHolder holder(spinLock_);
1532 return connectionEnqueuedForAcceptCallback_;
1535 unsigned int getConnectionDequeuedByAcceptCallback() const {
1536 folly::RWSpinLock::ReadHolder holder(spinLock_);
1537 return connectionDequeuedByAcceptCallback_;
1540 unsigned int getBackoffStarted() const {
1541 folly::RWSpinLock::ReadHolder holder(spinLock_);
1542 return backoffStarted_;
1545 unsigned int getBackoffEnded() const {
1546 folly::RWSpinLock::ReadHolder holder(spinLock_);
1547 return backoffEnded_;
1550 unsigned int getBackoffError() const {
1551 folly::RWSpinLock::ReadHolder holder(spinLock_);
1552 return backoffError_;
1556 mutable folly::RWSpinLock spinLock_;
1557 unsigned int connectionAccepted_{0};
1558 unsigned int connectionAcceptedError_{0};
1559 unsigned int connectionDropped_{0};
1560 unsigned int connectionEnqueuedForAcceptCallback_{0};
1561 unsigned int connectionDequeuedByAcceptCallback_{0};
1562 unsigned int backoffStarted_{0};
1563 unsigned int backoffEnded_{0};
1564 unsigned int backoffError_{0};
1568 * Helper AcceptCallback class for the test code
1569 * It records the callbacks that were invoked, and also supports calling
1570 * generic std::function objects in each callback.
1572 class TestAcceptCallback : public AsyncServerSocket::AcceptCallback {
1581 EventInfo(int fd, const folly::SocketAddress& addr)
1582 : type(TYPE_ACCEPT),
1586 explicit EventInfo(const std::string& msg)
1591 explicit EventInfo(EventType et)
1598 int fd; // valid for TYPE_ACCEPT
1599 folly::SocketAddress address; // valid for TYPE_ACCEPT
1600 string errorMsg; // valid for TYPE_ERROR
1602 typedef std::deque<EventInfo> EventList;
1604 TestAcceptCallback()
1605 : connectionAcceptedFn_(),
1610 std::deque<EventInfo>* getEvents() {
1614 void setConnectionAcceptedFn(
1615 const std::function<void(int, const folly::SocketAddress&)>& fn) {
1616 connectionAcceptedFn_ = fn;
1618 void setAcceptErrorFn(const std::function<void(const std::exception&)>& fn) {
1619 acceptErrorFn_ = fn;
1621 void setAcceptStartedFn(const std::function<void()>& fn) {
1622 acceptStartedFn_ = fn;
1624 void setAcceptStoppedFn(const std::function<void()>& fn) {
1625 acceptStoppedFn_ = fn;
1628 void connectionAccepted(
1629 int fd, const folly::SocketAddress& clientAddr) noexcept override {
1630 events_.emplace_back(fd, clientAddr);
1632 if (connectionAcceptedFn_) {
1633 connectionAcceptedFn_(fd, clientAddr);
1636 void acceptError(const std::exception& ex) noexcept override {
1637 events_.emplace_back(ex.what());
1639 if (acceptErrorFn_) {
1643 void acceptStarted() noexcept override {
1644 events_.emplace_back(TYPE_START);
1646 if (acceptStartedFn_) {
1650 void acceptStopped() noexcept override {
1651 events_.emplace_back(TYPE_STOP);
1653 if (acceptStoppedFn_) {
1659 std::function<void(int, const folly::SocketAddress&)> connectionAcceptedFn_;
1660 std::function<void(const std::exception&)> acceptErrorFn_;
1661 std::function<void()> acceptStartedFn_;
1662 std::function<void()> acceptStoppedFn_;
1664 std::deque<EventInfo> events_;
1669 * Make sure accepted sockets have O_NONBLOCK and TCP_NODELAY set
1671 TEST(AsyncSocketTest, ServerAcceptOptions) {
1672 EventBase eventBase;
1674 // Create a server socket
1675 std::shared_ptr<AsyncServerSocket> serverSocket(
1676 AsyncServerSocket::newSocket(&eventBase));
1677 serverSocket->bind(0);
1678 serverSocket->listen(16);
1679 folly::SocketAddress serverAddress;
1680 serverSocket->getAddress(&serverAddress);
1682 // Add a callback to accept one connection then stop the loop
1683 TestAcceptCallback acceptCallback;
1684 acceptCallback.setConnectionAcceptedFn(
1685 [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1686 serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
1688 acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
1689 serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
1691 serverSocket->addAcceptCallback(&acceptCallback, nullptr);
1692 serverSocket->startAccepting();
1694 // Connect to the server socket
1695 std::shared_ptr<AsyncSocket> socket(
1696 AsyncSocket::newSocket(&eventBase, serverAddress));
1700 // Verify that the server accepted a connection
1701 CHECK_EQ(acceptCallback.getEvents()->size(), 3);
1702 CHECK_EQ(acceptCallback.getEvents()->at(0).type,
1703 TestAcceptCallback::TYPE_START);
1704 CHECK_EQ(acceptCallback.getEvents()->at(1).type,
1705 TestAcceptCallback::TYPE_ACCEPT);
1706 CHECK_EQ(acceptCallback.getEvents()->at(2).type,
1707 TestAcceptCallback::TYPE_STOP);
1708 int fd = acceptCallback.getEvents()->at(1).fd;
1710 // The accepted connection should already be in non-blocking mode
1711 int flags = fcntl(fd, F_GETFL, 0);
1712 CHECK_EQ(flags & O_NONBLOCK, O_NONBLOCK);
1715 // The accepted connection should already have TCP_NODELAY set
1717 socklen_t valueLength = sizeof(value);
1718 int rc = getsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &value, &valueLength);
1725 * Test AsyncServerSocket::removeAcceptCallback()
1727 TEST(AsyncSocketTest, RemoveAcceptCallback) {
1728 // Create a new AsyncServerSocket
1729 EventBase eventBase;
1730 std::shared_ptr<AsyncServerSocket> serverSocket(
1731 AsyncServerSocket::newSocket(&eventBase));
1732 serverSocket->bind(0);
1733 serverSocket->listen(16);
1734 folly::SocketAddress serverAddress;
1735 serverSocket->getAddress(&serverAddress);
1737 // Add several accept callbacks
1738 TestAcceptCallback cb1;
1739 TestAcceptCallback cb2;
1740 TestAcceptCallback cb3;
1741 TestAcceptCallback cb4;
1742 TestAcceptCallback cb5;
1743 TestAcceptCallback cb6;
1744 TestAcceptCallback cb7;
1746 // Test having callbacks remove other callbacks before them on the list,
1747 // after them on the list, or removing themselves.
1749 // Have callback 2 remove callback 3 and callback 5 the first time it is
1752 cb1.setConnectionAcceptedFn([&](int /* fd */,
1753 const folly::SocketAddress& /* addr */) {
1754 std::shared_ptr<AsyncSocket> sock2(
1755 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb2: -cb3 -cb5
1757 cb3.setConnectionAcceptedFn(
1758 [&](int /* fd */, const folly::SocketAddress& /* addr */) {});
1759 cb4.setConnectionAcceptedFn(
1760 [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1761 std::shared_ptr<AsyncSocket> sock3(
1762 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb4
1764 cb5.setConnectionAcceptedFn(
1765 [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1766 std::shared_ptr<AsyncSocket> sock5(
1767 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb7: -cb7
1770 cb2.setConnectionAcceptedFn(
1771 [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1772 if (cb2Count == 0) {
1773 serverSocket->removeAcceptCallback(&cb3, nullptr);
1774 serverSocket->removeAcceptCallback(&cb5, nullptr);
1778 // Have callback 6 remove callback 4 the first time it is called,
1779 // and destroy the server socket the second time it is called
1781 cb6.setConnectionAcceptedFn(
1782 [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1783 if (cb6Count == 0) {
1784 serverSocket->removeAcceptCallback(&cb4, nullptr);
1785 std::shared_ptr<AsyncSocket> sock6(
1786 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb1
1787 std::shared_ptr<AsyncSocket> sock7(
1788 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb2
1789 std::shared_ptr<AsyncSocket> sock8(
1790 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb6: stop
1793 serverSocket.reset();
1797 // Have callback 7 remove itself
1798 cb7.setConnectionAcceptedFn(
1799 [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1800 serverSocket->removeAcceptCallback(&cb7, nullptr);
1803 serverSocket->addAcceptCallback(&cb1, nullptr);
1804 serverSocket->addAcceptCallback(&cb2, nullptr);
1805 serverSocket->addAcceptCallback(&cb3, nullptr);
1806 serverSocket->addAcceptCallback(&cb4, nullptr);
1807 serverSocket->addAcceptCallback(&cb5, nullptr);
1808 serverSocket->addAcceptCallback(&cb6, nullptr);
1809 serverSocket->addAcceptCallback(&cb7, nullptr);
1810 serverSocket->startAccepting();
1812 // Make several connections to the socket
1813 std::shared_ptr<AsyncSocket> sock1(
1814 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb1
1815 std::shared_ptr<AsyncSocket> sock4(
1816 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb6: -cb4
1818 // Loop until we are stopped
1821 // Check to make sure that the expected callbacks were invoked.
1823 // NOTE: This code depends on the AsyncServerSocket operating calling all of
1824 // the AcceptCallbacks in round-robin fashion, in the order that they were
1825 // added. The code is implemented this way right now, but the API doesn't
1826 // explicitly require it be done this way. If we change the code not to be
1827 // exactly round robin in the future, we can simplify the test checks here.
1828 // (We'll also need to update the termination code, since we expect cb6 to
1829 // get called twice to terminate the loop.)
1830 CHECK_EQ(cb1.getEvents()->size(), 4);
1831 CHECK_EQ(cb1.getEvents()->at(0).type,
1832 TestAcceptCallback::TYPE_START);
1833 CHECK_EQ(cb1.getEvents()->at(1).type,
1834 TestAcceptCallback::TYPE_ACCEPT);
1835 CHECK_EQ(cb1.getEvents()->at(2).type,
1836 TestAcceptCallback::TYPE_ACCEPT);
1837 CHECK_EQ(cb1.getEvents()->at(3).type,
1838 TestAcceptCallback::TYPE_STOP);
1840 CHECK_EQ(cb2.getEvents()->size(), 4);
1841 CHECK_EQ(cb2.getEvents()->at(0).type,
1842 TestAcceptCallback::TYPE_START);
1843 CHECK_EQ(cb2.getEvents()->at(1).type,
1844 TestAcceptCallback::TYPE_ACCEPT);
1845 CHECK_EQ(cb2.getEvents()->at(2).type,
1846 TestAcceptCallback::TYPE_ACCEPT);
1847 CHECK_EQ(cb2.getEvents()->at(3).type,
1848 TestAcceptCallback::TYPE_STOP);
1850 CHECK_EQ(cb3.getEvents()->size(), 2);
1851 CHECK_EQ(cb3.getEvents()->at(0).type,
1852 TestAcceptCallback::TYPE_START);
1853 CHECK_EQ(cb3.getEvents()->at(1).type,
1854 TestAcceptCallback::TYPE_STOP);
1856 CHECK_EQ(cb4.getEvents()->size(), 3);
1857 CHECK_EQ(cb4.getEvents()->at(0).type,
1858 TestAcceptCallback::TYPE_START);
1859 CHECK_EQ(cb4.getEvents()->at(1).type,
1860 TestAcceptCallback::TYPE_ACCEPT);
1861 CHECK_EQ(cb4.getEvents()->at(2).type,
1862 TestAcceptCallback::TYPE_STOP);
1864 CHECK_EQ(cb5.getEvents()->size(), 2);
1865 CHECK_EQ(cb5.getEvents()->at(0).type,
1866 TestAcceptCallback::TYPE_START);
1867 CHECK_EQ(cb5.getEvents()->at(1).type,
1868 TestAcceptCallback::TYPE_STOP);
1870 CHECK_EQ(cb6.getEvents()->size(), 4);
1871 CHECK_EQ(cb6.getEvents()->at(0).type,
1872 TestAcceptCallback::TYPE_START);
1873 CHECK_EQ(cb6.getEvents()->at(1).type,
1874 TestAcceptCallback::TYPE_ACCEPT);
1875 CHECK_EQ(cb6.getEvents()->at(2).type,
1876 TestAcceptCallback::TYPE_ACCEPT);
1877 CHECK_EQ(cb6.getEvents()->at(3).type,
1878 TestAcceptCallback::TYPE_STOP);
1880 CHECK_EQ(cb7.getEvents()->size(), 3);
1881 CHECK_EQ(cb7.getEvents()->at(0).type,
1882 TestAcceptCallback::TYPE_START);
1883 CHECK_EQ(cb7.getEvents()->at(1).type,
1884 TestAcceptCallback::TYPE_ACCEPT);
1885 CHECK_EQ(cb7.getEvents()->at(2).type,
1886 TestAcceptCallback::TYPE_STOP);
1890 * Test AsyncServerSocket::removeAcceptCallback()
1892 TEST(AsyncSocketTest, OtherThreadAcceptCallback) {
1893 // Create a new AsyncServerSocket
1894 EventBase eventBase;
1895 std::shared_ptr<AsyncServerSocket> serverSocket(
1896 AsyncServerSocket::newSocket(&eventBase));
1897 serverSocket->bind(0);
1898 serverSocket->listen(16);
1899 folly::SocketAddress serverAddress;
1900 serverSocket->getAddress(&serverAddress);
1902 // Add several accept callbacks
1903 TestAcceptCallback cb1;
1904 auto thread_id = pthread_self();
1905 cb1.setAcceptStartedFn([&](){
1906 CHECK_NE(thread_id, pthread_self());
1907 thread_id = pthread_self();
1909 cb1.setConnectionAcceptedFn(
1910 [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1911 CHECK_EQ(thread_id, pthread_self());
1912 serverSocket->removeAcceptCallback(&cb1, nullptr);
1914 cb1.setAcceptStoppedFn([&](){
1915 CHECK_EQ(thread_id, pthread_self());
1918 // Test having callbacks remove other callbacks before them on the list,
1919 serverSocket->addAcceptCallback(&cb1, nullptr);
1920 serverSocket->startAccepting();
1922 // Make several connections to the socket
1923 std::shared_ptr<AsyncSocket> sock1(
1924 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb1
1926 // Loop in another thread
1927 auto other = std::thread([&](){
1932 // Check to make sure that the expected callbacks were invoked.
1934 // NOTE: This code depends on the AsyncServerSocket operating calling all of
1935 // the AcceptCallbacks in round-robin fashion, in the order that they were
1936 // added. The code is implemented this way right now, but the API doesn't
1937 // explicitly require it be done this way. If we change the code not to be
1938 // exactly round robin in the future, we can simplify the test checks here.
1939 // (We'll also need to update the termination code, since we expect cb6 to
1940 // get called twice to terminate the loop.)
1941 CHECK_EQ(cb1.getEvents()->size(), 3);
1942 CHECK_EQ(cb1.getEvents()->at(0).type,
1943 TestAcceptCallback::TYPE_START);
1944 CHECK_EQ(cb1.getEvents()->at(1).type,
1945 TestAcceptCallback::TYPE_ACCEPT);
1946 CHECK_EQ(cb1.getEvents()->at(2).type,
1947 TestAcceptCallback::TYPE_STOP);
1951 void serverSocketSanityTest(AsyncServerSocket* serverSocket) {
1952 // Add a callback to accept one connection then stop accepting
1953 TestAcceptCallback acceptCallback;
1954 acceptCallback.setConnectionAcceptedFn(
1955 [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1956 serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
1958 acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
1959 serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
1961 serverSocket->addAcceptCallback(&acceptCallback, nullptr);
1962 serverSocket->startAccepting();
1964 // Connect to the server socket
1965 EventBase* eventBase = serverSocket->getEventBase();
1966 folly::SocketAddress serverAddress;
1967 serverSocket->getAddress(&serverAddress);
1968 AsyncSocket::UniquePtr socket(new AsyncSocket(eventBase, serverAddress));
1970 // Loop to process all events
1973 // Verify that the server accepted a connection
1974 CHECK_EQ(acceptCallback.getEvents()->size(), 3);
1975 CHECK_EQ(acceptCallback.getEvents()->at(0).type,
1976 TestAcceptCallback::TYPE_START);
1977 CHECK_EQ(acceptCallback.getEvents()->at(1).type,
1978 TestAcceptCallback::TYPE_ACCEPT);
1979 CHECK_EQ(acceptCallback.getEvents()->at(2).type,
1980 TestAcceptCallback::TYPE_STOP);
1983 /* Verify that we don't leak sockets if we are destroyed()
1984 * and there are still writes pending
1986 * If destroy() only calls close() instead of closeNow(),
1987 * it would shutdown(writes) on the socket, but it would
1988 * never be close()'d, and the socket would leak
1990 TEST(AsyncSocketTest, DestroyCloseTest) {
1996 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&clientEB);
1998 socket->connect(&ccb, server.getAddress(), 30);
2000 // Accept the connection
2001 std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&serverEB);
2003 acceptedSocket->setReadCB(&rcb);
2005 // Write a large buffer to the socket that is larger than kernel buffer
2006 size_t simpleBufLength = 5000000;
2007 char* simpleBuf = new char[simpleBufLength];
2008 memset(simpleBuf, 'a', simpleBufLength);
2011 // Let the reads and writes run to completion
2012 int fd = acceptedSocket->getFd();
2014 acceptedSocket->write(&wcb, simpleBuf, simpleBufLength);
2016 acceptedSocket.reset();
2018 // Test that server socket was closed
2019 ssize_t sz = read(fd, simpleBuf, simpleBufLength);
2026 * Test AsyncServerSocket::useExistingSocket()
2028 TEST(AsyncSocketTest, ServerExistingSocket) {
2029 EventBase eventBase;
2031 // Test creating a socket, and letting AsyncServerSocket bind and listen
2033 // Manually create a socket
2034 int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
2037 // Create a server socket
2038 AsyncServerSocket::UniquePtr serverSocket(
2039 new AsyncServerSocket(&eventBase));
2040 serverSocket->useExistingSocket(fd);
2041 folly::SocketAddress address;
2042 serverSocket->getAddress(&address);
2044 serverSocket->bind(address);
2045 serverSocket->listen(16);
2047 // Make sure the socket works
2048 serverSocketSanityTest(serverSocket.get());
2051 // Test creating a socket and binding manually,
2052 // then letting AsyncServerSocket listen
2054 // Manually create a socket
2055 int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
2058 struct sockaddr_in addr;
2059 addr.sin_family = AF_INET;
2061 addr.sin_addr.s_addr = INADDR_ANY;
2062 CHECK_EQ(bind(fd, reinterpret_cast<struct sockaddr*>(&addr),
2064 // Look up the address that we bound to
2065 folly::SocketAddress boundAddress;
2066 boundAddress.setFromLocalAddress(fd);
2068 // Create a server socket
2069 AsyncServerSocket::UniquePtr serverSocket(
2070 new AsyncServerSocket(&eventBase));
2071 serverSocket->useExistingSocket(fd);
2072 serverSocket->listen(16);
2074 // Make sure AsyncServerSocket reports the same address that we bound to
2075 folly::SocketAddress serverSocketAddress;
2076 serverSocket->getAddress(&serverSocketAddress);
2077 CHECK_EQ(boundAddress, serverSocketAddress);
2079 // Make sure the socket works
2080 serverSocketSanityTest(serverSocket.get());
2083 // Test creating a socket, binding and listening manually,
2084 // then giving it to AsyncServerSocket
2086 // Manually create a socket
2087 int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
2090 struct sockaddr_in addr;
2091 addr.sin_family = AF_INET;
2093 addr.sin_addr.s_addr = INADDR_ANY;
2094 CHECK_EQ(bind(fd, reinterpret_cast<struct sockaddr*>(&addr),
2096 // Look up the address that we bound to
2097 folly::SocketAddress boundAddress;
2098 boundAddress.setFromLocalAddress(fd);
2100 CHECK_EQ(listen(fd, 16), 0);
2102 // Create a server socket
2103 AsyncServerSocket::UniquePtr serverSocket(
2104 new AsyncServerSocket(&eventBase));
2105 serverSocket->useExistingSocket(fd);
2107 // Make sure AsyncServerSocket reports the same address that we bound to
2108 folly::SocketAddress serverSocketAddress;
2109 serverSocket->getAddress(&serverSocketAddress);
2110 CHECK_EQ(boundAddress, serverSocketAddress);
2112 // Make sure the socket works
2113 serverSocketSanityTest(serverSocket.get());
2117 TEST(AsyncSocketTest, UnixDomainSocketTest) {
2118 EventBase eventBase;
2120 // Create a server socket
2121 std::shared_ptr<AsyncServerSocket> serverSocket(
2122 AsyncServerSocket::newSocket(&eventBase));
2124 path.append(folly::to<string>("/anonymous", folly::Random::rand64()));
2125 folly::SocketAddress serverAddress;
2126 serverAddress.setFromPath(path);
2127 serverSocket->bind(serverAddress);
2128 serverSocket->listen(16);
2130 // Add a callback to accept one connection then stop the loop
2131 TestAcceptCallback acceptCallback;
2132 acceptCallback.setConnectionAcceptedFn(
2133 [&](int /* fd */, const folly::SocketAddress& /* addr */) {
2134 serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
2136 acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
2137 serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
2139 serverSocket->addAcceptCallback(&acceptCallback, nullptr);
2140 serverSocket->startAccepting();
2142 // Connect to the server socket
2143 std::shared_ptr<AsyncSocket> socket(
2144 AsyncSocket::newSocket(&eventBase, serverAddress));
2148 // Verify that the server accepted a connection
2149 CHECK_EQ(acceptCallback.getEvents()->size(), 3);
2150 CHECK_EQ(acceptCallback.getEvents()->at(0).type,
2151 TestAcceptCallback::TYPE_START);
2152 CHECK_EQ(acceptCallback.getEvents()->at(1).type,
2153 TestAcceptCallback::TYPE_ACCEPT);
2154 CHECK_EQ(acceptCallback.getEvents()->at(2).type,
2155 TestAcceptCallback::TYPE_STOP);
2156 int fd = acceptCallback.getEvents()->at(1).fd;
2158 // The accepted connection should already be in non-blocking mode
2159 int flags = fcntl(fd, F_GETFL, 0);
2160 CHECK_EQ(flags & O_NONBLOCK, O_NONBLOCK);
2163 TEST(AsyncSocketTest, ConnectionEventCallbackDefault) {
2164 EventBase eventBase;
2165 TestConnectionEventCallback connectionEventCallback;
2167 // Create a server socket
2168 std::shared_ptr<AsyncServerSocket> serverSocket(
2169 AsyncServerSocket::newSocket(&eventBase));
2170 serverSocket->setConnectionEventCallback(&connectionEventCallback);
2171 serverSocket->bind(0);
2172 serverSocket->listen(16);
2173 folly::SocketAddress serverAddress;
2174 serverSocket->getAddress(&serverAddress);
2176 // Add a callback to accept one connection then stop the loop
2177 TestAcceptCallback acceptCallback;
2178 acceptCallback.setConnectionAcceptedFn(
2179 [&](int /* fd */, const folly::SocketAddress& /* addr */) {
2180 serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
2182 acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
2183 serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
2185 serverSocket->addAcceptCallback(&acceptCallback, nullptr);
2186 serverSocket->startAccepting();
2188 // Connect to the server socket
2189 std::shared_ptr<AsyncSocket> socket(
2190 AsyncSocket::newSocket(&eventBase, serverAddress));
2194 // Validate the connection event counters
2195 ASSERT_EQ(connectionEventCallback.getConnectionAccepted(), 1);
2196 ASSERT_EQ(connectionEventCallback.getConnectionAcceptedError(), 0);
2197 ASSERT_EQ(connectionEventCallback.getConnectionDropped(), 0);
2199 connectionEventCallback.getConnectionEnqueuedForAcceptCallback(), 1);
2200 ASSERT_EQ(connectionEventCallback.getConnectionDequeuedByAcceptCallback(), 1);
2201 ASSERT_EQ(connectionEventCallback.getBackoffStarted(), 0);
2202 ASSERT_EQ(connectionEventCallback.getBackoffEnded(), 0);
2203 ASSERT_EQ(connectionEventCallback.getBackoffError(), 0);
2207 * Test AsyncServerSocket::getNumPendingMessagesInQueue()
2209 TEST(AsyncSocketTest, NumPendingMessagesInQueue) {
2210 EventBase eventBase;
2212 // Counter of how many connections have been accepted
2215 // Create a server socket
2216 auto serverSocket(AsyncServerSocket::newSocket(&eventBase));
2217 serverSocket->bind(0);
2218 serverSocket->listen(16);
2219 folly::SocketAddress serverAddress;
2220 serverSocket->getAddress(&serverAddress);
2222 // Add a callback to accept connections
2223 TestAcceptCallback acceptCallback;
2224 acceptCallback.setConnectionAcceptedFn(
2225 [&](int /* fd */, const folly::SocketAddress& /* addr */) {
2227 CHECK_EQ(4 - count, serverSocket->getNumPendingMessagesInQueue());
2230 // all messages are processed, remove accept callback
2231 serverSocket->removeAcceptCallback(&acceptCallback, &eventBase);
2234 acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
2235 serverSocket->removeAcceptCallback(&acceptCallback, &eventBase);
2237 serverSocket->addAcceptCallback(&acceptCallback, &eventBase);
2238 serverSocket->startAccepting();
2240 // Connect to the server socket, 4 clients, there are 4 connections
2241 auto socket1(AsyncSocket::newSocket(&eventBase, serverAddress));
2242 auto socket2(AsyncSocket::newSocket(&eventBase, serverAddress));
2243 auto socket3(AsyncSocket::newSocket(&eventBase, serverAddress));
2244 auto socket4(AsyncSocket::newSocket(&eventBase, serverAddress));
2250 * Test AsyncTransport::BufferCallback
2252 TEST(AsyncSocketTest, BufferTest) {
2256 AsyncSocket::OptionMap option{{{SOL_SOCKET, SO_SNDBUF}, 128}};
2257 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
2259 socket->connect(&ccb, server.getAddress(), 30, option);
2261 char buf[100 * 1024];
2262 memset(buf, 'c', sizeof(buf));
2265 socket->setBufferCallback(&bcb);
2266 socket->write(&wcb, buf, sizeof(buf), WriteFlags::NONE);
2269 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
2270 CHECK_EQ(wcb.state, STATE_SUCCEEDED);
2272 ASSERT_TRUE(bcb.hasBuffered());
2273 ASSERT_TRUE(bcb.hasBufferCleared());
2276 server.verifyConnection(buf, sizeof(buf));
2278 ASSERT_TRUE(socket->isClosedBySelf());
2279 ASSERT_FALSE(socket->isClosedByPeer());
2282 TEST(AsyncSocketTest, BufferCallbackKill) {
2285 AsyncSocket::OptionMap option{{{SOL_SOCKET, SO_SNDBUF}, 128}};
2286 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
2288 socket->connect(&ccb, server.getAddress(), 30, option);
2291 char buf[100 * 1024];
2292 memset(buf, 'c', sizeof(buf));
2293 BufferCallback* bcb = new BufferCallback;
2294 socket->setBufferCallback(bcb);
2296 wcb.successCallback = [&] {
2297 ASSERT_TRUE(socket.unique());
2301 // This will trigger AsyncSocket::handleWrite,
2302 // which calls WriteCallback::writeSuccess,
2303 // which calls wcb.successCallback above,
2304 // which tries to delete socket
2305 // Then, the socket will also try to use this BufferCallback
2306 // And that should crash us, if there is no DestructorGuard on the stack
2307 socket->write(&wcb, buf, sizeof(buf), WriteFlags::NONE);
2310 CHECK_EQ(ccb.state, STATE_SUCCEEDED);