2 * Copyright 2015 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 #include <folly/io/async/AsyncServerSocket.h>
17 #include <folly/io/async/AsyncSocket.h>
18 #include <folly/io/async/AsyncTimeout.h>
19 #include <folly/io/async/EventBase.h>
20 #include <folly/SocketAddress.h>
22 #include <folly/io/IOBuf.h>
23 #include <folly/io/async/test/AsyncSocketTest.h>
24 #include <folly/io/async/test/Util.h>
26 #include <gtest/gtest.h>
27 #include <boost/scoped_array.hpp>
32 #include <sys/types.h>
33 #include <sys/socket.h>
34 #include <netinet/tcp.h>
37 using namespace boost;
44 using std::unique_ptr;
45 using std::chrono::milliseconds;
46 using boost::scoped_array;
48 using namespace folly;
50 class DelayedWrite: public AsyncTimeout {
52 DelayedWrite(const std::shared_ptr<AsyncSocket>& socket,
53 unique_ptr<IOBuf>&& bufs, AsyncTransportWrapper::WriteCallback* wcb,
54 bool cork, bool lastWrite = false):
55 AsyncTimeout(socket->getEventBase()),
57 bufs_(std::move(bufs)),
60 lastWrite_(lastWrite) {}
63 void timeoutExpired() noexcept override {
64 WriteFlags flags = cork_ ? WriteFlags::CORK : WriteFlags::NONE;
65 socket_->writeChain(wcb_, std::move(bufs_), flags);
67 socket_->shutdownWrite();
71 std::shared_ptr<AsyncSocket> socket_;
72 unique_ptr<IOBuf> bufs_;
73 AsyncTransportWrapper::WriteCallback* wcb_;
78 ///////////////////////////////////////////////////////////////////////////
80 ///////////////////////////////////////////////////////////////////////////
83 * Test connecting to a server
85 TEST(AsyncSocketTest, Connect) {
86 // Start listening on a local port
89 // Connect using a AsyncSocket
91 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
93 socket->connect(&cb, server.getAddress(), 30);
97 CHECK_EQ(cb.state, STATE_SUCCEEDED);
101 * Test connecting to a server that isn't listening
103 TEST(AsyncSocketTest, ConnectRefused) {
106 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
108 // Hopefully nothing is actually listening on this address
109 folly::SocketAddress addr("127.0.0.1", 65535);
111 socket->connect(&cb, addr, 30);
115 CHECK_EQ(cb.state, STATE_FAILED);
116 CHECK_EQ(cb.exception.getType(), AsyncSocketException::NOT_OPEN);
120 * Test connection timeout
122 TEST(AsyncSocketTest, ConnectTimeout) {
125 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
127 // Try connecting to server that won't respond.
129 // This depends somewhat on the network where this test is run.
130 // Hopefully this IP will be routable but unresponsive.
131 // (Alternatively, we could try listening on a local raw socket, but that
132 // normally requires root privileges.)
133 folly::SocketAddress addr("8.8.8.8", 65535);
135 socket->connect(&cb, addr, 1); // also set a ridiculously small timeout
139 CHECK_EQ(cb.state, STATE_FAILED);
140 CHECK_EQ(cb.exception.getType(), AsyncSocketException::TIMED_OUT);
142 // Verify that we can still get the peer address after a timeout.
143 // Use case is if the client was created from a client pool, and we want
144 // to log which peer failed.
145 folly::SocketAddress peer;
146 socket->getPeerAddress(&peer);
147 CHECK_EQ(peer, addr);
151 * Test writing immediately after connecting, without waiting for connect
154 TEST(AsyncSocketTest, ConnectAndWrite) {
159 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
161 socket->connect(&ccb, server.getAddress(), 30);
165 memset(buf, 'a', sizeof(buf));
167 socket->write(&wcb, buf, sizeof(buf));
169 // Loop. We don't bother accepting on the server socket yet.
170 // The kernel should be able to buffer the write request so it can succeed.
173 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
174 CHECK_EQ(wcb.state, STATE_SUCCEEDED);
176 // Make sure the server got a connection and received the data
178 server.verifyConnection(buf, sizeof(buf));
182 * Test connecting using a nullptr connect callback.
184 TEST(AsyncSocketTest, ConnectNullCallback) {
189 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
190 socket->connect(nullptr, server.getAddress(), 30);
192 // write some data, just so we have some way of verifing
193 // that the socket works correctly after connecting
195 memset(buf, 'a', sizeof(buf));
197 socket->write(&wcb, buf, sizeof(buf));
201 CHECK_EQ(wcb.state, STATE_SUCCEEDED);
203 // Make sure the server got a connection and received the data
205 server.verifyConnection(buf, sizeof(buf));
209 * Test calling both write() and close() immediately after connecting, without
210 * waiting for connect to finish.
212 * This exercises the STATE_CONNECTING_CLOSING code.
214 TEST(AsyncSocketTest, ConnectWriteAndClose) {
219 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
221 socket->connect(&ccb, server.getAddress(), 30);
225 memset(buf, 'a', sizeof(buf));
227 socket->write(&wcb, buf, sizeof(buf));
232 // Loop. We don't bother accepting on the server socket yet.
233 // The kernel should be able to buffer the write request so it can succeed.
236 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
237 CHECK_EQ(wcb.state, STATE_SUCCEEDED);
239 // Make sure the server got a connection and received the data
240 server.verifyConnection(buf, sizeof(buf));
244 * Test calling close() immediately after connect()
246 TEST(AsyncSocketTest, ConnectAndClose) {
249 // Connect using a AsyncSocket
251 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
253 socket->connect(&ccb, server.getAddress(), 30);
255 // Hopefully the connect didn't succeed immediately.
256 // If it did, we can't exercise the close-while-connecting code path.
257 if (ccb.state == STATE_SUCCEEDED) {
258 LOG(INFO) << "connect() succeeded immediately; aborting test "
259 "of close-during-connect behavior";
265 // Loop, although there shouldn't be anything to do.
268 // Make sure the connection was aborted
269 CHECK_EQ(ccb.state, STATE_FAILED);
273 * Test calling closeNow() immediately after connect()
275 * This should be identical to the normal close behavior.
277 TEST(AsyncSocketTest, ConnectAndCloseNow) {
280 // Connect using a AsyncSocket
282 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
284 socket->connect(&ccb, server.getAddress(), 30);
286 // Hopefully the connect didn't succeed immediately.
287 // If it did, we can't exercise the close-while-connecting code path.
288 if (ccb.state == STATE_SUCCEEDED) {
289 LOG(INFO) << "connect() succeeded immediately; aborting test "
290 "of closeNow()-during-connect behavior";
296 // Loop, although there shouldn't be anything to do.
299 // Make sure the connection was aborted
300 CHECK_EQ(ccb.state, STATE_FAILED);
304 * Test calling both write() and closeNow() immediately after connecting,
305 * without waiting for connect to finish.
307 * This should abort the pending write.
309 TEST(AsyncSocketTest, ConnectWriteAndCloseNow) {
314 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
316 socket->connect(&ccb, server.getAddress(), 30);
318 // Hopefully the connect didn't succeed immediately.
319 // If it did, we can't exercise the close-while-connecting code path.
320 if (ccb.state == STATE_SUCCEEDED) {
321 LOG(INFO) << "connect() succeeded immediately; aborting test "
322 "of write-during-connect behavior";
328 memset(buf, 'a', sizeof(buf));
330 socket->write(&wcb, buf, sizeof(buf));
335 // Loop, although there shouldn't be anything to do.
338 CHECK_EQ(ccb.state, STATE_FAILED);
339 CHECK_EQ(wcb.state, STATE_FAILED);
343 * Test installing a read callback immediately, before connect() finishes.
345 TEST(AsyncSocketTest, ConnectAndRead) {
350 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
352 socket->connect(&ccb, server.getAddress(), 30);
355 socket->setReadCB(&rcb);
357 // Even though we haven't looped yet, we should be able to accept
358 // the connection and send data to it.
359 std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
361 memset(buf, 'a', sizeof(buf));
362 acceptedSocket->write(buf, sizeof(buf));
363 acceptedSocket->flush();
364 acceptedSocket->close();
366 // Loop, although there shouldn't be anything to do.
369 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
370 CHECK_EQ(rcb.state, STATE_SUCCEEDED);
371 CHECK_EQ(rcb.buffers.size(), 1);
372 CHECK_EQ(rcb.buffers[0].length, sizeof(buf));
373 CHECK_EQ(memcmp(rcb.buffers[0].buffer, buf, sizeof(buf)), 0);
377 * Test installing a read callback and then closing immediately before the
378 * connect attempt finishes.
380 TEST(AsyncSocketTest, ConnectReadAndClose) {
385 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
387 socket->connect(&ccb, server.getAddress(), 30);
389 // Hopefully the connect didn't succeed immediately.
390 // If it did, we can't exercise the close-while-connecting code path.
391 if (ccb.state == STATE_SUCCEEDED) {
392 LOG(INFO) << "connect() succeeded immediately; aborting test "
393 "of read-during-connect behavior";
398 socket->setReadCB(&rcb);
403 // Loop, although there shouldn't be anything to do.
406 CHECK_EQ(ccb.state, STATE_FAILED); // we aborted the close attempt
407 CHECK_EQ(rcb.buffers.size(), 0);
408 CHECK_EQ(rcb.state, STATE_SUCCEEDED); // this indicates EOF
412 * Test both writing and installing a read callback immediately,
413 * before connect() finishes.
415 TEST(AsyncSocketTest, ConnectWriteAndRead) {
420 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
422 socket->connect(&ccb, server.getAddress(), 30);
426 memset(buf1, 'a', sizeof(buf1));
428 socket->write(&wcb, buf1, sizeof(buf1));
430 // set a read callback
432 socket->setReadCB(&rcb);
434 // Even though we haven't looped yet, we should be able to accept
435 // the connection and send data to it.
436 std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
438 memset(buf2, 'b', sizeof(buf2));
439 acceptedSocket->write(buf2, sizeof(buf2));
440 acceptedSocket->flush();
442 // shut down the write half of acceptedSocket, so that the AsyncSocket
443 // will stop reading and we can break out of the event loop.
444 shutdown(acceptedSocket->getSocketFD(), SHUT_WR);
449 // Make sure the connect succeeded
450 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
452 // Make sure the AsyncSocket read the data written by the accepted socket
453 CHECK_EQ(rcb.state, STATE_SUCCEEDED);
454 CHECK_EQ(rcb.buffers.size(), 1);
455 CHECK_EQ(rcb.buffers[0].length, sizeof(buf2));
456 CHECK_EQ(memcmp(rcb.buffers[0].buffer, buf2, sizeof(buf2)), 0);
458 // Close the AsyncSocket so we'll see EOF on acceptedSocket
461 // Make sure the accepted socket saw the data written by the AsyncSocket
462 uint8_t readbuf[sizeof(buf1)];
463 acceptedSocket->readAll(readbuf, sizeof(readbuf));
464 CHECK_EQ(memcmp(buf1, readbuf, sizeof(buf1)), 0);
465 uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
466 CHECK_EQ(bytesRead, 0);
470 * Test writing to the socket then shutting down writes before the connect
473 TEST(AsyncSocketTest, ConnectWriteAndShutdownWrite) {
478 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
480 socket->connect(&ccb, server.getAddress(), 30);
482 // Hopefully the connect didn't succeed immediately.
483 // If it did, we can't exercise the write-while-connecting code path.
484 if (ccb.state == STATE_SUCCEEDED) {
485 LOG(INFO) << "connect() succeeded immediately; skipping test";
489 // Ask to write some data
491 memset(wbuf, 'a', sizeof(wbuf));
493 socket->write(&wcb, wbuf, sizeof(wbuf));
494 socket->shutdownWrite();
497 socket->shutdownWrite();
499 // Even though we haven't looped yet, we should be able to accept
501 std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
503 // Since the connection is still in progress, there should be no data to
504 // read yet. Verify that the accepted socket is not readable.
505 struct pollfd fds[1];
506 fds[0].fd = acceptedSocket->getSocketFD();
507 fds[0].events = POLLIN;
509 int rc = poll(fds, 1, 0);
512 // Write data to the accepted socket
513 uint8_t acceptedWbuf[192];
514 memset(acceptedWbuf, 'b', sizeof(acceptedWbuf));
515 acceptedSocket->write(acceptedWbuf, sizeof(acceptedWbuf));
516 acceptedSocket->flush();
521 // The loop should have completed the connection, written the queued data,
522 // and shutdown writes on the socket.
524 // Check that the connection was completed successfully and that the write
525 // callback succeeded.
526 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
527 CHECK_EQ(wcb.state, STATE_SUCCEEDED);
529 // Check that we can read the data that was written to the socket, and that
530 // we see an EOF, since its socket was half-shutdown.
531 uint8_t readbuf[sizeof(wbuf)];
532 acceptedSocket->readAll(readbuf, sizeof(readbuf));
533 CHECK_EQ(memcmp(wbuf, readbuf, sizeof(wbuf)), 0);
534 uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
535 CHECK_EQ(bytesRead, 0);
537 // Close the accepted socket. This will cause it to see EOF
538 // and uninstall the read callback when we loop next.
539 acceptedSocket->close();
541 // Install a read callback, then loop again.
543 socket->setReadCB(&rcb);
546 // This loop should have read the data and seen the EOF
547 CHECK_EQ(rcb.state, STATE_SUCCEEDED);
548 CHECK_EQ(rcb.buffers.size(), 1);
549 CHECK_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf));
550 CHECK_EQ(memcmp(rcb.buffers[0].buffer,
551 acceptedWbuf, sizeof(acceptedWbuf)), 0);
555 * Test reading, writing, and shutting down writes before the connect attempt
558 TEST(AsyncSocketTest, ConnectReadWriteAndShutdownWrite) {
563 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
565 socket->connect(&ccb, server.getAddress(), 30);
567 // Hopefully the connect didn't succeed immediately.
568 // If it did, we can't exercise the write-while-connecting code path.
569 if (ccb.state == STATE_SUCCEEDED) {
570 LOG(INFO) << "connect() succeeded immediately; skipping test";
574 // Install a read callback
576 socket->setReadCB(&rcb);
578 // Ask to write some data
580 memset(wbuf, 'a', sizeof(wbuf));
582 socket->write(&wcb, wbuf, sizeof(wbuf));
585 socket->shutdownWrite();
587 // Even though we haven't looped yet, we should be able to accept
589 std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
591 // Since the connection is still in progress, there should be no data to
592 // read yet. Verify that the accepted socket is not readable.
593 struct pollfd fds[1];
594 fds[0].fd = acceptedSocket->getSocketFD();
595 fds[0].events = POLLIN;
597 int rc = poll(fds, 1, 0);
600 // Write data to the accepted socket
601 uint8_t acceptedWbuf[192];
602 memset(acceptedWbuf, 'b', sizeof(acceptedWbuf));
603 acceptedSocket->write(acceptedWbuf, sizeof(acceptedWbuf));
604 acceptedSocket->flush();
605 // Shutdown writes to the accepted socket. This will cause it to see EOF
606 // and uninstall the read callback.
607 ::shutdown(acceptedSocket->getSocketFD(), SHUT_WR);
612 // The loop should have completed the connection, written the queued data,
613 // shutdown writes on the socket, read the data we wrote to it, and see the
616 // Check that the connection was completed successfully and that the read
617 // and write callbacks were invoked as expected.
618 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
619 CHECK_EQ(rcb.state, STATE_SUCCEEDED);
620 CHECK_EQ(rcb.buffers.size(), 1);
621 CHECK_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf));
622 CHECK_EQ(memcmp(rcb.buffers[0].buffer,
623 acceptedWbuf, sizeof(acceptedWbuf)), 0);
624 CHECK_EQ(wcb.state, STATE_SUCCEEDED);
626 // Check that we can read the data that was written to the socket, and that
627 // we see an EOF, since its socket was half-shutdown.
628 uint8_t readbuf[sizeof(wbuf)];
629 acceptedSocket->readAll(readbuf, sizeof(readbuf));
630 CHECK_EQ(memcmp(wbuf, readbuf, sizeof(wbuf)), 0);
631 uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
632 CHECK_EQ(bytesRead, 0);
634 // Fully close both sockets
635 acceptedSocket->close();
640 * Test reading, writing, and calling shutdownWriteNow() before the
641 * connect attempt finishes.
643 TEST(AsyncSocketTest, ConnectReadWriteAndShutdownWriteNow) {
648 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
650 socket->connect(&ccb, server.getAddress(), 30);
652 // Hopefully the connect didn't succeed immediately.
653 // If it did, we can't exercise the write-while-connecting code path.
654 if (ccb.state == STATE_SUCCEEDED) {
655 LOG(INFO) << "connect() succeeded immediately; skipping test";
659 // Install a read callback
661 socket->setReadCB(&rcb);
663 // Ask to write some data
665 memset(wbuf, 'a', sizeof(wbuf));
667 socket->write(&wcb, wbuf, sizeof(wbuf));
669 // Shutdown writes immediately.
670 // This should immediately discard the data that we just tried to write.
671 socket->shutdownWriteNow();
673 // Verify that writeError() was invoked on the write callback.
674 CHECK_EQ(wcb.state, STATE_FAILED);
675 CHECK_EQ(wcb.bytesWritten, 0);
677 // Even though we haven't looped yet, we should be able to accept
679 std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
681 // Since the connection is still in progress, there should be no data to
682 // read yet. Verify that the accepted socket is not readable.
683 struct pollfd fds[1];
684 fds[0].fd = acceptedSocket->getSocketFD();
685 fds[0].events = POLLIN;
687 int rc = poll(fds, 1, 0);
690 // Write data to the accepted socket
691 uint8_t acceptedWbuf[192];
692 memset(acceptedWbuf, 'b', sizeof(acceptedWbuf));
693 acceptedSocket->write(acceptedWbuf, sizeof(acceptedWbuf));
694 acceptedSocket->flush();
695 // Shutdown writes to the accepted socket. This will cause it to see EOF
696 // and uninstall the read callback.
697 ::shutdown(acceptedSocket->getSocketFD(), SHUT_WR);
702 // The loop should have completed the connection, written the queued data,
703 // shutdown writes on the socket, read the data we wrote to it, and see the
706 // Check that the connection was completed successfully and that the read
707 // callback was invoked as expected.
708 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
709 CHECK_EQ(rcb.state, STATE_SUCCEEDED);
710 CHECK_EQ(rcb.buffers.size(), 1);
711 CHECK_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf));
712 CHECK_EQ(memcmp(rcb.buffers[0].buffer,
713 acceptedWbuf, sizeof(acceptedWbuf)), 0);
715 // Since we used shutdownWriteNow(), it should have discarded all pending
716 // write data. Verify we see an immediate EOF when reading from the accepted
718 uint8_t readbuf[sizeof(wbuf)];
719 uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
720 CHECK_EQ(bytesRead, 0);
722 // Fully close both sockets
723 acceptedSocket->close();
727 // Helper function for use in testConnectOptWrite()
728 // Temporarily disable the read callback
729 void tmpDisableReads(AsyncSocket* socket, ReadCallback* rcb) {
730 // Uninstall the read callback
731 socket->setReadCB(nullptr);
732 // Schedule the read callback to be reinstalled after 1ms
733 socket->getEventBase()->runInLoop(
734 std::bind(&AsyncSocket::setReadCB, socket, rcb));
738 * Test connect+write, then have the connect callback perform another write.
740 * This tests interaction of the optimistic writing after connect with
741 * additional write attempts that occur in the connect callback.
743 void testConnectOptWrite(size_t size1, size_t size2, bool close = false) {
746 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
750 socket->connect(&ccb, server.getAddress(), 30);
752 // Hopefully the connect didn't succeed immediately.
753 // If it did, we can't exercise the optimistic write code path.
754 if (ccb.state == STATE_SUCCEEDED) {
755 LOG(INFO) << "connect() succeeded immediately; aborting test "
756 "of optimistic write behavior";
760 // Tell the connect callback to perform a write when the connect succeeds
762 scoped_array<char> buf2(new char[size2]);
763 memset(buf2.get(), 'b', size2);
765 ccb.successCallback = [&] { socket->write(&wcb2, buf2.get(), size2); };
766 // Tell the second write callback to close the connection when it is done
767 wcb2.successCallback = [&] { socket->closeNow(); };
770 // Schedule one write() immediately, before the connect finishes
771 scoped_array<char> buf1(new char[size1]);
772 memset(buf1.get(), 'a', size1);
775 socket->write(&wcb1, buf1.get(), size1);
779 // immediately perform a close, before connect() completes
783 // Start reading from the other endpoint after 10ms.
784 // If we're using large buffers, we have to read so that the writes don't
786 std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&evb);
788 rcb.dataAvailableCallback = std::bind(tmpDisableReads,
789 acceptedSocket.get(), &rcb);
790 socket->getEventBase()->tryRunAfterDelay(
791 std::bind(&AsyncSocket::setReadCB, acceptedSocket.get(), &rcb),
794 // Loop. We don't bother accepting on the server socket yet.
795 // The kernel should be able to buffer the write request so it can succeed.
798 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
800 CHECK_EQ(wcb1.state, STATE_SUCCEEDED);
803 CHECK_EQ(wcb2.state, STATE_SUCCEEDED);
808 // Make sure the read callback received all of the data
809 size_t bytesRead = 0;
810 for (vector<ReadCallback::Buffer>::const_iterator it = rcb.buffers.begin();
811 it != rcb.buffers.end();
813 size_t start = bytesRead;
814 bytesRead += it->length;
815 size_t end = bytesRead;
817 size_t cmpLen = min(size1, end) - start;
818 CHECK_EQ(memcmp(it->buffer, buf1.get() + start, cmpLen), 0);
820 if (end > size1 && end <= size1 + size2) {
824 if (start >= size1) {
826 buf2Offset = start - size1;
827 cmpLen = end - start;
829 itOffset = size1 - start;
831 cmpLen = end - size1;
833 CHECK_EQ(memcmp(it->buffer + itOffset, buf2.get() + buf2Offset,
838 CHECK_EQ(bytesRead, size1 + size2);
841 TEST(AsyncSocketTest, ConnectCallbackWrite) {
842 // Test using small writes that should both succeed immediately
843 testConnectOptWrite(100, 200);
845 // Test using a large buffer in the connect callback, that should block
846 const size_t largeSize = 8*1024*1024;
847 testConnectOptWrite(100, largeSize);
849 // Test using a large initial write
850 testConnectOptWrite(largeSize, 100);
852 // Test using two large buffers
853 testConnectOptWrite(largeSize, largeSize);
855 // Test a small write in the connect callback,
856 // but no immediate write before connect completes
857 testConnectOptWrite(0, 64);
859 // Test a large write in the connect callback,
860 // but no immediate write before connect completes
861 testConnectOptWrite(0, largeSize);
863 // Test connect, a small write, then immediately call close() before connect
865 testConnectOptWrite(211, 0, true);
867 // Test connect, a large immediate write (that will block), then immediately
868 // call close() before connect completes
869 testConnectOptWrite(largeSize, 0, true);
872 ///////////////////////////////////////////////////////////////////////////
873 // write() related tests
874 ///////////////////////////////////////////////////////////////////////////
877 * Test writing using a nullptr callback
879 TEST(AsyncSocketTest, WriteNullCallback) {
884 std::shared_ptr<AsyncSocket> socket =
885 AsyncSocket::newSocket(&evb, server.getAddress(), 30);
886 evb.loop(); // loop until the socket is connected
888 // write() with a nullptr callback
890 memset(buf, 'a', sizeof(buf));
891 socket->write(nullptr, buf, sizeof(buf));
893 evb.loop(); // loop until the data is sent
895 // Make sure the server got a connection and received the data
897 server.verifyConnection(buf, sizeof(buf));
901 * Test writing with a send timeout
903 TEST(AsyncSocketTest, WriteTimeout) {
908 std::shared_ptr<AsyncSocket> socket =
909 AsyncSocket::newSocket(&evb, server.getAddress(), 30);
910 evb.loop(); // loop until the socket is connected
912 // write() a large chunk of data, with no-one on the other end reading
913 size_t writeLength = 8*1024*1024;
914 uint32_t timeout = 200;
915 socket->setSendTimeout(timeout);
916 scoped_array<char> buf(new char[writeLength]);
917 memset(buf.get(), 'a', writeLength);
919 socket->write(&wcb, buf.get(), writeLength);
925 // Make sure the write attempt timed out as requested
926 CHECK_EQ(wcb.state, STATE_FAILED);
927 CHECK_EQ(wcb.exception.getType(), AsyncSocketException::TIMED_OUT);
929 // Check that the write timed out within a reasonable period of time.
930 // We don't check for exactly the specified timeout, since AsyncSocket only
931 // times out when it hasn't made progress for that period of time.
933 // On linux, the first write sends a few hundred kb of data, then blocks for
934 // writability, and then unblocks again after 40ms and is able to write
935 // another smaller of data before blocking permanently. Therefore it doesn't
936 // time out until 40ms + timeout.
938 // I haven't fully verified the cause of this, but I believe it probably
939 // occurs because the receiving end delays sending an ack for up to 40ms.
940 // (This is the default value for TCP_DELACK_MIN.) Once the sender receives
941 // the ack, it can send some more data. However, after that point the
942 // receiver's kernel buffer is full. This 40ms delay happens even with
943 // TCP_NODELAY and TCP_QUICKACK enabled on both endpoints. However, the
944 // kernel may be automatically disabling TCP_QUICKACK after receiving some
947 // For now, we simply check that the timeout occurred within 160ms of
948 // the requested value.
949 T_CHECK_TIMEOUT(start, end, milliseconds(timeout), milliseconds(160));
953 * Test writing to a socket that the remote endpoint has closed
955 TEST(AsyncSocketTest, WritePipeError) {
960 std::shared_ptr<AsyncSocket> socket =
961 AsyncSocket::newSocket(&evb, server.getAddress(), 30);
962 socket->setSendTimeout(1000);
963 evb.loop(); // loop until the socket is connected
965 // accept and immediately close the socket
966 std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
967 acceptedSocket.reset();
969 // write() a large chunk of data
970 size_t writeLength = 8*1024*1024;
971 scoped_array<char> buf(new char[writeLength]);
972 memset(buf.get(), 'a', writeLength);
974 socket->write(&wcb, buf.get(), writeLength);
978 // Make sure the write failed.
979 // It would be nice if AsyncSocketException could convey the errno value,
980 // so that we could check for EPIPE
981 CHECK_EQ(wcb.state, STATE_FAILED);
982 CHECK_EQ(wcb.exception.getType(),
983 AsyncSocketException::INTERNAL_ERROR);
987 * Test writing a mix of simple buffers and IOBufs
989 TEST(AsyncSocketTest, WriteIOBuf) {
994 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
996 socket->connect(&ccb, server.getAddress(), 30);
998 // Accept the connection
999 std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&evb);
1001 acceptedSocket->setReadCB(&rcb);
1003 // Write a simple buffer to the socket
1004 size_t simpleBufLength = 5;
1005 char simpleBuf[simpleBufLength];
1006 memset(simpleBuf, 'a', simpleBufLength);
1008 socket->write(&wcb, simpleBuf, simpleBufLength);
1010 // Write a single-element IOBuf chain
1011 size_t buf1Length = 7;
1012 unique_ptr<IOBuf> buf1(IOBuf::create(buf1Length));
1013 memset(buf1->writableData(), 'b', buf1Length);
1014 buf1->append(buf1Length);
1015 unique_ptr<IOBuf> buf1Copy(buf1->clone());
1017 socket->writeChain(&wcb2, std::move(buf1));
1019 // Write a multiple-element IOBuf chain
1020 size_t buf2Length = 11;
1021 unique_ptr<IOBuf> buf2(IOBuf::create(buf2Length));
1022 memset(buf2->writableData(), 'c', buf2Length);
1023 buf2->append(buf2Length);
1024 size_t buf3Length = 13;
1025 unique_ptr<IOBuf> buf3(IOBuf::create(buf3Length));
1026 memset(buf3->writableData(), 'd', buf3Length);
1027 buf3->append(buf3Length);
1028 buf2->appendChain(std::move(buf3));
1029 unique_ptr<IOBuf> buf2Copy(buf2->clone());
1030 buf2Copy->coalesce();
1032 socket->writeChain(&wcb3, std::move(buf2));
1033 socket->shutdownWrite();
1035 // Let the reads and writes run to completion
1038 CHECK_EQ(wcb.state, STATE_SUCCEEDED);
1039 CHECK_EQ(wcb2.state, STATE_SUCCEEDED);
1040 CHECK_EQ(wcb3.state, STATE_SUCCEEDED);
1042 // Make sure the reader got the right data in the right order
1043 CHECK_EQ(rcb.state, STATE_SUCCEEDED);
1044 CHECK_EQ(rcb.buffers.size(), 1);
1045 CHECK_EQ(rcb.buffers[0].length,
1046 simpleBufLength + buf1Length + buf2Length + buf3Length);
1048 memcmp(rcb.buffers[0].buffer, simpleBuf, simpleBufLength), 0);
1050 memcmp(rcb.buffers[0].buffer + simpleBufLength,
1051 buf1Copy->data(), buf1Copy->length()), 0);
1053 memcmp(rcb.buffers[0].buffer + simpleBufLength + buf1Length,
1054 buf2Copy->data(), buf2Copy->length()), 0);
1056 acceptedSocket->close();
1060 TEST(AsyncSocketTest, WriteIOBufCorked) {
1065 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
1067 socket->connect(&ccb, server.getAddress(), 30);
1069 // Accept the connection
1070 std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&evb);
1072 acceptedSocket->setReadCB(&rcb);
1074 // Do three writes, 100ms apart, with the "cork" flag set
1075 // on the second write. The reader should see the first write
1076 // arrive by itself, followed by the second and third writes
1077 // arriving together.
1078 size_t buf1Length = 5;
1079 unique_ptr<IOBuf> buf1(IOBuf::create(buf1Length));
1080 memset(buf1->writableData(), 'a', buf1Length);
1081 buf1->append(buf1Length);
1082 size_t buf2Length = 7;
1083 unique_ptr<IOBuf> buf2(IOBuf::create(buf2Length));
1084 memset(buf2->writableData(), 'b', buf2Length);
1085 buf2->append(buf2Length);
1086 size_t buf3Length = 11;
1087 unique_ptr<IOBuf> buf3(IOBuf::create(buf3Length));
1088 memset(buf3->writableData(), 'c', buf3Length);
1089 buf3->append(buf3Length);
1091 socket->writeChain(&wcb1, std::move(buf1));
1093 DelayedWrite write2(socket, std::move(buf2), &wcb2, true);
1094 write2.scheduleTimeout(100);
1096 DelayedWrite write3(socket, std::move(buf3), &wcb3, false, true);
1097 write3.scheduleTimeout(200);
1100 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
1101 CHECK_EQ(wcb1.state, STATE_SUCCEEDED);
1102 CHECK_EQ(wcb2.state, STATE_SUCCEEDED);
1103 if (wcb3.state != STATE_SUCCEEDED) {
1104 throw(wcb3.exception);
1106 CHECK_EQ(wcb3.state, STATE_SUCCEEDED);
1108 // Make sure the reader got the data with the right grouping
1109 CHECK_EQ(rcb.state, STATE_SUCCEEDED);
1110 CHECK_EQ(rcb.buffers.size(), 2);
1111 CHECK_EQ(rcb.buffers[0].length, buf1Length);
1112 CHECK_EQ(rcb.buffers[1].length, buf2Length + buf3Length);
1114 acceptedSocket->close();
1119 * Test performing a zero-length write
1121 TEST(AsyncSocketTest, ZeroLengthWrite) {
1126 std::shared_ptr<AsyncSocket> socket =
1127 AsyncSocket::newSocket(&evb, server.getAddress(), 30);
1128 evb.loop(); // loop until the socket is connected
1130 auto acceptedSocket = server.acceptAsync(&evb);
1132 acceptedSocket->setReadCB(&rcb);
1134 size_t len1 = 1024*1024;
1135 size_t len2 = 1024*1024;
1136 std::unique_ptr<char[]> buf(new char[len1 + len2]);
1137 memset(buf.get(), 'a', len1);
1138 memset(buf.get(), 'b', len2);
1144 socket->write(&wcb1, buf.get(), 0);
1145 socket->write(&wcb2, buf.get(), len1);
1146 socket->write(&wcb3, buf.get() + len1, 0);
1147 socket->write(&wcb4, buf.get() + len1, len2);
1150 evb.loop(); // loop until the data is sent
1152 CHECK_EQ(wcb1.state, STATE_SUCCEEDED);
1153 CHECK_EQ(wcb2.state, STATE_SUCCEEDED);
1154 CHECK_EQ(wcb3.state, STATE_SUCCEEDED);
1155 CHECK_EQ(wcb4.state, STATE_SUCCEEDED);
1156 rcb.verifyData(buf.get(), len1 + len2);
1159 TEST(AsyncSocketTest, ZeroLengthWritev) {
1164 std::shared_ptr<AsyncSocket> socket =
1165 AsyncSocket::newSocket(&evb, server.getAddress(), 30);
1166 evb.loop(); // loop until the socket is connected
1168 auto acceptedSocket = server.acceptAsync(&evb);
1170 acceptedSocket->setReadCB(&rcb);
1172 size_t len1 = 1024*1024;
1173 size_t len2 = 1024*1024;
1174 std::unique_ptr<char[]> buf(new char[len1 + len2]);
1175 memset(buf.get(), 'a', len1);
1176 memset(buf.get(), 'b', len2);
1179 size_t iovCount = 4;
1180 struct iovec iov[iovCount];
1181 iov[0].iov_base = buf.get();
1182 iov[0].iov_len = len1;
1183 iov[1].iov_base = buf.get() + len1;
1185 iov[2].iov_base = buf.get() + len1;
1186 iov[2].iov_len = len2;
1187 iov[3].iov_base = buf.get() + len1 + len2;
1190 socket->writev(&wcb, iov, iovCount);
1192 evb.loop(); // loop until the data is sent
1194 CHECK_EQ(wcb.state, STATE_SUCCEEDED);
1195 rcb.verifyData(buf.get(), len1 + len2);
1198 ///////////////////////////////////////////////////////////////////////////
1199 // close() related tests
1200 ///////////////////////////////////////////////////////////////////////////
1203 * Test calling close() with pending writes when the socket is already closing.
1205 TEST(AsyncSocketTest, ClosePendingWritesWhileClosing) {
1210 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
1212 socket->connect(&ccb, server.getAddress(), 30);
1214 // accept the socket on the server side
1215 std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
1217 // Loop to ensure the connect has completed
1220 // Make sure we are connected
1221 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
1223 // Schedule pending writes, until several write attempts have blocked
1225 memset(buf, 'a', sizeof(buf));
1226 typedef vector< std::shared_ptr<WriteCallback> > WriteCallbackVector;
1227 WriteCallbackVector writeCallbacks;
1229 writeCallbacks.reserve(5);
1230 while (writeCallbacks.size() < 5) {
1231 std::shared_ptr<WriteCallback> wcb(new WriteCallback);
1233 socket->write(wcb.get(), buf, sizeof(buf));
1234 if (wcb->state == STATE_SUCCEEDED) {
1235 // Succeeded immediately. Keep performing more writes
1239 // This write is blocked.
1240 // Have the write callback call close() when writeError() is invoked
1241 wcb->errorCallback = std::bind(&AsyncSocket::close, socket.get());
1242 writeCallbacks.push_back(wcb);
1245 // Call closeNow() to immediately fail the pending writes
1248 // Make sure writeError() was invoked on all of the pending write callbacks
1249 for (WriteCallbackVector::const_iterator it = writeCallbacks.begin();
1250 it != writeCallbacks.end();
1252 CHECK_EQ((*it)->state, STATE_FAILED);
1256 ///////////////////////////////////////////////////////////////////////////
1257 // ImmediateRead related tests
1258 ///////////////////////////////////////////////////////////////////////////
1260 /* AsyncSocket use to verify immediate read works */
1261 class AsyncSocketImmediateRead : public folly::AsyncSocket {
1263 bool immediateReadCalled = false;
1264 explicit AsyncSocketImmediateRead(folly::EventBase* evb) : AsyncSocket(evb) {}
1266 void checkForImmediateRead() noexcept override {
1267 immediateReadCalled = true;
1268 AsyncSocket::handleRead();
1272 TEST(AsyncSocket, ConnectReadImmediateRead) {
1275 const size_t maxBufferSz = 100;
1276 const size_t maxReadsPerEvent = 1;
1277 const size_t expectedDataSz = maxBufferSz * 3;
1278 char expectedData[expectedDataSz];
1279 memset(expectedData, 'j', expectedDataSz);
1282 ReadCallback rcb(maxBufferSz);
1283 AsyncSocketImmediateRead socket(&evb);
1284 socket.connect(nullptr, server.getAddress(), 30);
1286 evb.loop(); // loop until the socket is connected
1288 socket.setReadCB(&rcb);
1289 socket.setMaxReadsPerEvent(maxReadsPerEvent);
1290 socket.immediateReadCalled = false;
1292 auto acceptedSocket = server.acceptAsync(&evb);
1294 ReadCallback rcbServer;
1295 WriteCallback wcbServer;
1296 rcbServer.dataAvailableCallback = [&]() {
1297 if (rcbServer.dataRead() == expectedDataSz) {
1298 // write back all data read
1299 rcbServer.verifyData(expectedData, expectedDataSz);
1300 acceptedSocket->write(&wcbServer, expectedData, expectedDataSz);
1301 acceptedSocket->close();
1304 acceptedSocket->setReadCB(&rcbServer);
1308 socket.write(&wcb1, expectedData, expectedDataSz);
1310 CHECK_EQ(wcb1.state, STATE_SUCCEEDED);
1311 rcb.verifyData(expectedData, expectedDataSz);
1312 CHECK_EQ(socket.immediateReadCalled, true);
1315 TEST(AsyncSocket, ConnectReadUninstallRead) {
1318 const size_t maxBufferSz = 100;
1319 const size_t maxReadsPerEvent = 1;
1320 const size_t expectedDataSz = maxBufferSz * 3;
1321 char expectedData[expectedDataSz];
1322 memset(expectedData, 'k', expectedDataSz);
1325 ReadCallback rcb(maxBufferSz);
1326 AsyncSocketImmediateRead socket(&evb);
1327 socket.connect(nullptr, server.getAddress(), 30);
1329 evb.loop(); // loop until the socket is connected
1331 socket.setReadCB(&rcb);
1332 socket.setMaxReadsPerEvent(maxReadsPerEvent);
1333 socket.immediateReadCalled = false;
1335 auto acceptedSocket = server.acceptAsync(&evb);
1337 ReadCallback rcbServer;
1338 WriteCallback wcbServer;
1339 rcbServer.dataAvailableCallback = [&]() {
1340 if (rcbServer.dataRead() == expectedDataSz) {
1341 // write back all data read
1342 rcbServer.verifyData(expectedData, expectedDataSz);
1343 acceptedSocket->write(&wcbServer, expectedData, expectedDataSz);
1344 acceptedSocket->close();
1347 acceptedSocket->setReadCB(&rcbServer);
1349 rcb.dataAvailableCallback = [&]() {
1350 // we read data and reset readCB
1351 socket.setReadCB(nullptr);
1356 socket.write(&wcb, expectedData, expectedDataSz);
1358 CHECK_EQ(wcb.state, STATE_SUCCEEDED);
1360 /* we shoud've only read maxBufferSz data since readCallback_
1361 * was reset in dataAvailableCallback */
1362 CHECK_EQ(rcb.dataRead(), maxBufferSz);
1363 CHECK_EQ(socket.immediateReadCalled, false);
1367 // - Test connect() and have the connect callback set the read callback
1368 // - Test connect() and have the connect callback unset the read callback
1369 // - Test reading/writing/closing/destroying the socket in the connect callback
1370 // - Test reading/writing/closing/destroying the socket in the read callback
1371 // - Test reading/writing/closing/destroying the socket in the write callback
1372 // - Test one-way shutdown behavior
1373 // - Test changing the EventBase
1375 // - TODO: test multiple threads sharing a AsyncSocket, and detaching from it
1376 // in connectSuccess(), readDataAvailable(), writeSuccess()
1379 ///////////////////////////////////////////////////////////////////////////
1380 // AsyncServerSocket tests
1381 ///////////////////////////////////////////////////////////////////////////
1384 * Helper AcceptCallback class for the test code
1385 * It records the callbacks that were invoked, and also supports calling
1386 * generic std::function objects in each callback.
1388 class TestAcceptCallback : public AsyncServerSocket::AcceptCallback {
1397 EventInfo(int fd, const folly::SocketAddress& addr)
1398 : type(TYPE_ACCEPT),
1402 explicit EventInfo(const std::string& msg)
1407 explicit EventInfo(EventType et)
1414 int fd; // valid for TYPE_ACCEPT
1415 folly::SocketAddress address; // valid for TYPE_ACCEPT
1416 string errorMsg; // valid for TYPE_ERROR
1418 typedef std::deque<EventInfo> EventList;
1420 TestAcceptCallback()
1421 : connectionAcceptedFn_(),
1426 std::deque<EventInfo>* getEvents() {
1430 void setConnectionAcceptedFn(
1431 const std::function<void(int, const folly::SocketAddress&)>& fn) {
1432 connectionAcceptedFn_ = fn;
1434 void setAcceptErrorFn(const std::function<void(const std::exception&)>& fn) {
1435 acceptErrorFn_ = fn;
1437 void setAcceptStartedFn(const std::function<void()>& fn) {
1438 acceptStartedFn_ = fn;
1440 void setAcceptStoppedFn(const std::function<void()>& fn) {
1441 acceptStoppedFn_ = fn;
1444 void connectionAccepted(
1445 int fd, const folly::SocketAddress& clientAddr) noexcept override {
1446 events_.emplace_back(fd, clientAddr);
1448 if (connectionAcceptedFn_) {
1449 connectionAcceptedFn_(fd, clientAddr);
1452 void acceptError(const std::exception& ex) noexcept override {
1453 events_.emplace_back(ex.what());
1455 if (acceptErrorFn_) {
1459 void acceptStarted() noexcept override {
1460 events_.emplace_back(TYPE_START);
1462 if (acceptStartedFn_) {
1466 void acceptStopped() noexcept override {
1467 events_.emplace_back(TYPE_STOP);
1469 if (acceptStoppedFn_) {
1475 std::function<void(int, const folly::SocketAddress&)> connectionAcceptedFn_;
1476 std::function<void(const std::exception&)> acceptErrorFn_;
1477 std::function<void()> acceptStartedFn_;
1478 std::function<void()> acceptStoppedFn_;
1480 std::deque<EventInfo> events_;
1484 * Make sure accepted sockets have O_NONBLOCK and TCP_NODELAY set
1486 TEST(AsyncSocketTest, ServerAcceptOptions) {
1487 EventBase eventBase;
1489 // Create a server socket
1490 std::shared_ptr<AsyncServerSocket> serverSocket(
1491 AsyncServerSocket::newSocket(&eventBase));
1492 serverSocket->bind(0);
1493 serverSocket->listen(16);
1494 folly::SocketAddress serverAddress;
1495 serverSocket->getAddress(&serverAddress);
1497 // Add a callback to accept one connection then stop the loop
1498 TestAcceptCallback acceptCallback;
1499 acceptCallback.setConnectionAcceptedFn(
1500 [&](int fd, const folly::SocketAddress& addr) {
1501 serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
1503 acceptCallback.setAcceptErrorFn([&](const std::exception& ex) {
1504 serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
1506 serverSocket->addAcceptCallback(&acceptCallback, nullptr);
1507 serverSocket->startAccepting();
1509 // Connect to the server socket
1510 std::shared_ptr<AsyncSocket> socket(
1511 AsyncSocket::newSocket(&eventBase, serverAddress));
1515 // Verify that the server accepted a connection
1516 CHECK_EQ(acceptCallback.getEvents()->size(), 3);
1517 CHECK_EQ(acceptCallback.getEvents()->at(0).type,
1518 TestAcceptCallback::TYPE_START);
1519 CHECK_EQ(acceptCallback.getEvents()->at(1).type,
1520 TestAcceptCallback::TYPE_ACCEPT);
1521 CHECK_EQ(acceptCallback.getEvents()->at(2).type,
1522 TestAcceptCallback::TYPE_STOP);
1523 int fd = acceptCallback.getEvents()->at(1).fd;
1525 // The accepted connection should already be in non-blocking mode
1526 int flags = fcntl(fd, F_GETFL, 0);
1527 CHECK_EQ(flags & O_NONBLOCK, O_NONBLOCK);
1530 // The accepted connection should already have TCP_NODELAY set
1532 socklen_t valueLength = sizeof(value);
1533 int rc = getsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &value, &valueLength);
1540 * Test AsyncServerSocket::removeAcceptCallback()
1542 TEST(AsyncSocketTest, RemoveAcceptCallback) {
1543 // Create a new AsyncServerSocket
1544 EventBase eventBase;
1545 std::shared_ptr<AsyncServerSocket> serverSocket(
1546 AsyncServerSocket::newSocket(&eventBase));
1547 serverSocket->bind(0);
1548 serverSocket->listen(16);
1549 folly::SocketAddress serverAddress;
1550 serverSocket->getAddress(&serverAddress);
1552 // Add several accept callbacks
1553 TestAcceptCallback cb1;
1554 TestAcceptCallback cb2;
1555 TestAcceptCallback cb3;
1556 TestAcceptCallback cb4;
1557 TestAcceptCallback cb5;
1558 TestAcceptCallback cb6;
1559 TestAcceptCallback cb7;
1561 // Test having callbacks remove other callbacks before them on the list,
1562 // after them on the list, or removing themselves.
1564 // Have callback 2 remove callback 3 and callback 5 the first time it is
1567 cb1.setConnectionAcceptedFn([&](int fd, const folly::SocketAddress& addr){
1568 std::shared_ptr<AsyncSocket> sock2(
1569 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb2: -cb3 -cb5
1571 cb3.setConnectionAcceptedFn([&](int fd, const folly::SocketAddress& addr){
1573 cb4.setConnectionAcceptedFn([&](int fd, const folly::SocketAddress& addr){
1574 std::shared_ptr<AsyncSocket> sock3(
1575 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb4
1577 cb5.setConnectionAcceptedFn([&](int fd, const folly::SocketAddress& addr){
1578 std::shared_ptr<AsyncSocket> sock5(
1579 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb7: -cb7
1582 cb2.setConnectionAcceptedFn(
1583 [&](int fd, const folly::SocketAddress& addr) {
1584 if (cb2Count == 0) {
1585 serverSocket->removeAcceptCallback(&cb3, nullptr);
1586 serverSocket->removeAcceptCallback(&cb5, nullptr);
1590 // Have callback 6 remove callback 4 the first time it is called,
1591 // and destroy the server socket the second time it is called
1593 cb6.setConnectionAcceptedFn(
1594 [&](int fd, const folly::SocketAddress& addr) {
1595 if (cb6Count == 0) {
1596 serverSocket->removeAcceptCallback(&cb4, nullptr);
1597 std::shared_ptr<AsyncSocket> sock6(
1598 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb1
1599 std::shared_ptr<AsyncSocket> sock7(
1600 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb2
1601 std::shared_ptr<AsyncSocket> sock8(
1602 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb6: stop
1605 serverSocket.reset();
1609 // Have callback 7 remove itself
1610 cb7.setConnectionAcceptedFn(
1611 [&](int fd, const folly::SocketAddress& addr) {
1612 serverSocket->removeAcceptCallback(&cb7, nullptr);
1615 serverSocket->addAcceptCallback(&cb1, nullptr);
1616 serverSocket->addAcceptCallback(&cb2, nullptr);
1617 serverSocket->addAcceptCallback(&cb3, nullptr);
1618 serverSocket->addAcceptCallback(&cb4, nullptr);
1619 serverSocket->addAcceptCallback(&cb5, nullptr);
1620 serverSocket->addAcceptCallback(&cb6, nullptr);
1621 serverSocket->addAcceptCallback(&cb7, nullptr);
1622 serverSocket->startAccepting();
1624 // Make several connections to the socket
1625 std::shared_ptr<AsyncSocket> sock1(
1626 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb1
1627 std::shared_ptr<AsyncSocket> sock4(
1628 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb6: -cb4
1630 // Loop until we are stopped
1633 // Check to make sure that the expected callbacks were invoked.
1635 // NOTE: This code depends on the AsyncServerSocket operating calling all of
1636 // the AcceptCallbacks in round-robin fashion, in the order that they were
1637 // added. The code is implemented this way right now, but the API doesn't
1638 // explicitly require it be done this way. If we change the code not to be
1639 // exactly round robin in the future, we can simplify the test checks here.
1640 // (We'll also need to update the termination code, since we expect cb6 to
1641 // get called twice to terminate the loop.)
1642 CHECK_EQ(cb1.getEvents()->size(), 4);
1643 CHECK_EQ(cb1.getEvents()->at(0).type,
1644 TestAcceptCallback::TYPE_START);
1645 CHECK_EQ(cb1.getEvents()->at(1).type,
1646 TestAcceptCallback::TYPE_ACCEPT);
1647 CHECK_EQ(cb1.getEvents()->at(2).type,
1648 TestAcceptCallback::TYPE_ACCEPT);
1649 CHECK_EQ(cb1.getEvents()->at(3).type,
1650 TestAcceptCallback::TYPE_STOP);
1652 CHECK_EQ(cb2.getEvents()->size(), 4);
1653 CHECK_EQ(cb2.getEvents()->at(0).type,
1654 TestAcceptCallback::TYPE_START);
1655 CHECK_EQ(cb2.getEvents()->at(1).type,
1656 TestAcceptCallback::TYPE_ACCEPT);
1657 CHECK_EQ(cb2.getEvents()->at(2).type,
1658 TestAcceptCallback::TYPE_ACCEPT);
1659 CHECK_EQ(cb2.getEvents()->at(3).type,
1660 TestAcceptCallback::TYPE_STOP);
1662 CHECK_EQ(cb3.getEvents()->size(), 2);
1663 CHECK_EQ(cb3.getEvents()->at(0).type,
1664 TestAcceptCallback::TYPE_START);
1665 CHECK_EQ(cb3.getEvents()->at(1).type,
1666 TestAcceptCallback::TYPE_STOP);
1668 CHECK_EQ(cb4.getEvents()->size(), 3);
1669 CHECK_EQ(cb4.getEvents()->at(0).type,
1670 TestAcceptCallback::TYPE_START);
1671 CHECK_EQ(cb4.getEvents()->at(1).type,
1672 TestAcceptCallback::TYPE_ACCEPT);
1673 CHECK_EQ(cb4.getEvents()->at(2).type,
1674 TestAcceptCallback::TYPE_STOP);
1676 CHECK_EQ(cb5.getEvents()->size(), 2);
1677 CHECK_EQ(cb5.getEvents()->at(0).type,
1678 TestAcceptCallback::TYPE_START);
1679 CHECK_EQ(cb5.getEvents()->at(1).type,
1680 TestAcceptCallback::TYPE_STOP);
1682 CHECK_EQ(cb6.getEvents()->size(), 4);
1683 CHECK_EQ(cb6.getEvents()->at(0).type,
1684 TestAcceptCallback::TYPE_START);
1685 CHECK_EQ(cb6.getEvents()->at(1).type,
1686 TestAcceptCallback::TYPE_ACCEPT);
1687 CHECK_EQ(cb6.getEvents()->at(2).type,
1688 TestAcceptCallback::TYPE_ACCEPT);
1689 CHECK_EQ(cb6.getEvents()->at(3).type,
1690 TestAcceptCallback::TYPE_STOP);
1692 CHECK_EQ(cb7.getEvents()->size(), 3);
1693 CHECK_EQ(cb7.getEvents()->at(0).type,
1694 TestAcceptCallback::TYPE_START);
1695 CHECK_EQ(cb7.getEvents()->at(1).type,
1696 TestAcceptCallback::TYPE_ACCEPT);
1697 CHECK_EQ(cb7.getEvents()->at(2).type,
1698 TestAcceptCallback::TYPE_STOP);
1702 * Test AsyncServerSocket::removeAcceptCallback()
1704 TEST(AsyncSocketTest, OtherThreadAcceptCallback) {
1705 // Create a new AsyncServerSocket
1706 EventBase eventBase;
1707 std::shared_ptr<AsyncServerSocket> serverSocket(
1708 AsyncServerSocket::newSocket(&eventBase));
1709 serverSocket->bind(0);
1710 serverSocket->listen(16);
1711 folly::SocketAddress serverAddress;
1712 serverSocket->getAddress(&serverAddress);
1714 // Add several accept callbacks
1715 TestAcceptCallback cb1;
1716 auto thread_id = pthread_self();
1717 cb1.setAcceptStartedFn([&](){
1718 CHECK_NE(thread_id, pthread_self());
1719 thread_id = pthread_self();
1721 cb1.setConnectionAcceptedFn([&](int fd, const folly::SocketAddress& addr){
1722 CHECK_EQ(thread_id, pthread_self());
1723 serverSocket->removeAcceptCallback(&cb1, nullptr);
1725 cb1.setAcceptStoppedFn([&](){
1726 CHECK_EQ(thread_id, pthread_self());
1729 // Test having callbacks remove other callbacks before them on the list,
1730 serverSocket->addAcceptCallback(&cb1, nullptr);
1731 serverSocket->startAccepting();
1733 // Make several connections to the socket
1734 std::shared_ptr<AsyncSocket> sock1(
1735 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb1
1737 // Loop in another thread
1738 auto other = std::thread([&](){
1743 // Check to make sure that the expected callbacks were invoked.
1745 // NOTE: This code depends on the AsyncServerSocket operating calling all of
1746 // the AcceptCallbacks in round-robin fashion, in the order that they were
1747 // added. The code is implemented this way right now, but the API doesn't
1748 // explicitly require it be done this way. If we change the code not to be
1749 // exactly round robin in the future, we can simplify the test checks here.
1750 // (We'll also need to update the termination code, since we expect cb6 to
1751 // get called twice to terminate the loop.)
1752 CHECK_EQ(cb1.getEvents()->size(), 3);
1753 CHECK_EQ(cb1.getEvents()->at(0).type,
1754 TestAcceptCallback::TYPE_START);
1755 CHECK_EQ(cb1.getEvents()->at(1).type,
1756 TestAcceptCallback::TYPE_ACCEPT);
1757 CHECK_EQ(cb1.getEvents()->at(2).type,
1758 TestAcceptCallback::TYPE_STOP);
1762 void serverSocketSanityTest(AsyncServerSocket* serverSocket) {
1763 // Add a callback to accept one connection then stop accepting
1764 TestAcceptCallback acceptCallback;
1765 acceptCallback.setConnectionAcceptedFn(
1766 [&](int fd, const folly::SocketAddress& addr) {
1767 serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
1769 acceptCallback.setAcceptErrorFn([&](const std::exception& ex) {
1770 serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
1772 serverSocket->addAcceptCallback(&acceptCallback, nullptr);
1773 serverSocket->startAccepting();
1775 // Connect to the server socket
1776 EventBase* eventBase = serverSocket->getEventBase();
1777 folly::SocketAddress serverAddress;
1778 serverSocket->getAddress(&serverAddress);
1779 AsyncSocket::UniquePtr socket(new AsyncSocket(eventBase, serverAddress));
1781 // Loop to process all events
1784 // Verify that the server accepted a connection
1785 CHECK_EQ(acceptCallback.getEvents()->size(), 3);
1786 CHECK_EQ(acceptCallback.getEvents()->at(0).type,
1787 TestAcceptCallback::TYPE_START);
1788 CHECK_EQ(acceptCallback.getEvents()->at(1).type,
1789 TestAcceptCallback::TYPE_ACCEPT);
1790 CHECK_EQ(acceptCallback.getEvents()->at(2).type,
1791 TestAcceptCallback::TYPE_STOP);
1794 /* Verify that we don't leak sockets if we are destroyed()
1795 * and there are still writes pending
1797 * If destroy() only calls close() instead of closeNow(),
1798 * it would shutdown(writes) on the socket, but it would
1799 * never be close()'d, and the socket would leak
1801 TEST(AsyncSocketTest, DestroyCloseTest) {
1807 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&clientEB);
1809 socket->connect(&ccb, server.getAddress(), 30);
1811 // Accept the connection
1812 std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&serverEB);
1814 acceptedSocket->setReadCB(&rcb);
1816 // Write a large buffer to the socket that is larger than kernel buffer
1817 size_t simpleBufLength = 5000000;
1818 char* simpleBuf = new char[simpleBufLength];
1819 memset(simpleBuf, 'a', simpleBufLength);
1822 // Let the reads and writes run to completion
1823 int fd = acceptedSocket->getFd();
1825 acceptedSocket->write(&wcb, simpleBuf, simpleBufLength);
1827 acceptedSocket.reset();
1829 // Test that server socket was closed
1830 ssize_t sz = read(fd, simpleBuf, simpleBufLength);
1837 * Test AsyncServerSocket::useExistingSocket()
1839 TEST(AsyncSocketTest, ServerExistingSocket) {
1840 EventBase eventBase;
1842 // Test creating a socket, and letting AsyncServerSocket bind and listen
1844 // Manually create a socket
1845 int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
1848 // Create a server socket
1849 AsyncServerSocket::UniquePtr serverSocket(
1850 new AsyncServerSocket(&eventBase));
1851 serverSocket->useExistingSocket(fd);
1852 folly::SocketAddress address;
1853 serverSocket->getAddress(&address);
1855 serverSocket->bind(address);
1856 serverSocket->listen(16);
1858 // Make sure the socket works
1859 serverSocketSanityTest(serverSocket.get());
1862 // Test creating a socket and binding manually,
1863 // then letting AsyncServerSocket listen
1865 // Manually create a socket
1866 int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
1869 struct sockaddr_in addr;
1870 addr.sin_family = AF_INET;
1872 addr.sin_addr.s_addr = INADDR_ANY;
1873 CHECK_EQ(bind(fd, reinterpret_cast<struct sockaddr*>(&addr),
1875 // Look up the address that we bound to
1876 folly::SocketAddress boundAddress;
1877 boundAddress.setFromLocalAddress(fd);
1879 // Create a server socket
1880 AsyncServerSocket::UniquePtr serverSocket(
1881 new AsyncServerSocket(&eventBase));
1882 serverSocket->useExistingSocket(fd);
1883 serverSocket->listen(16);
1885 // Make sure AsyncServerSocket reports the same address that we bound to
1886 folly::SocketAddress serverSocketAddress;
1887 serverSocket->getAddress(&serverSocketAddress);
1888 CHECK_EQ(boundAddress, serverSocketAddress);
1890 // Make sure the socket works
1891 serverSocketSanityTest(serverSocket.get());
1894 // Test creating a socket, binding and listening manually,
1895 // then giving it to AsyncServerSocket
1897 // Manually create a socket
1898 int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
1901 struct sockaddr_in addr;
1902 addr.sin_family = AF_INET;
1904 addr.sin_addr.s_addr = INADDR_ANY;
1905 CHECK_EQ(bind(fd, reinterpret_cast<struct sockaddr*>(&addr),
1907 // Look up the address that we bound to
1908 folly::SocketAddress boundAddress;
1909 boundAddress.setFromLocalAddress(fd);
1911 CHECK_EQ(listen(fd, 16), 0);
1913 // Create a server socket
1914 AsyncServerSocket::UniquePtr serverSocket(
1915 new AsyncServerSocket(&eventBase));
1916 serverSocket->useExistingSocket(fd);
1918 // Make sure AsyncServerSocket reports the same address that we bound to
1919 folly::SocketAddress serverSocketAddress;
1920 serverSocket->getAddress(&serverSocketAddress);
1921 CHECK_EQ(boundAddress, serverSocketAddress);
1923 // Make sure the socket works
1924 serverSocketSanityTest(serverSocket.get());
1928 TEST(AsyncSocketTest, UnixDomainSocketTest) {
1929 EventBase eventBase;
1931 // Create a server socket
1932 std::shared_ptr<AsyncServerSocket> serverSocket(
1933 AsyncServerSocket::newSocket(&eventBase));
1935 path.append("/anonymous");
1936 folly::SocketAddress serverAddress;
1937 serverAddress.setFromPath(path);
1938 serverSocket->bind(serverAddress);
1939 serverSocket->listen(16);
1941 // Add a callback to accept one connection then stop the loop
1942 TestAcceptCallback acceptCallback;
1943 acceptCallback.setConnectionAcceptedFn(
1944 [&](int fd, const folly::SocketAddress& addr) {
1945 serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
1947 acceptCallback.setAcceptErrorFn([&](const std::exception& ex) {
1948 serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
1950 serverSocket->addAcceptCallback(&acceptCallback, nullptr);
1951 serverSocket->startAccepting();
1953 // Connect to the server socket
1954 std::shared_ptr<AsyncSocket> socket(
1955 AsyncSocket::newSocket(&eventBase, serverAddress));
1959 // Verify that the server accepted a connection
1960 CHECK_EQ(acceptCallback.getEvents()->size(), 3);
1961 CHECK_EQ(acceptCallback.getEvents()->at(0).type,
1962 TestAcceptCallback::TYPE_START);
1963 CHECK_EQ(acceptCallback.getEvents()->at(1).type,
1964 TestAcceptCallback::TYPE_ACCEPT);
1965 CHECK_EQ(acceptCallback.getEvents()->at(2).type,
1966 TestAcceptCallback::TYPE_STOP);
1967 int fd = acceptCallback.getEvents()->at(1).fd;
1969 // The accepted connection should already be in non-blocking mode
1970 int flags = fcntl(fd, F_GETFL, 0);
1971 CHECK_EQ(flags & O_NONBLOCK, O_NONBLOCK);