2 * Copyright 2015 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 #include <folly/io/async/AsyncServerSocket.h>
17 #include <folly/io/async/AsyncSocket.h>
18 #include <folly/io/async/AsyncTimeout.h>
19 #include <folly/io/async/EventBase.h>
20 #include <folly/SocketAddress.h>
22 #include <folly/io/IOBuf.h>
23 #include <folly/io/async/test/BlockingSocket.h>
24 #include <folly/io/async/test/Util.h>
26 #include <gtest/gtest.h>
27 #include <boost/scoped_array.hpp>
32 #include <sys/types.h>
33 #include <sys/socket.h>
34 #include <netinet/tcp.h>
37 using namespace boost;
44 using std::unique_ptr;
45 using std::chrono::milliseconds;
46 using boost::scoped_array;
48 using namespace folly;
56 typedef std::function<void()> VoidCallback;
59 class ConnCallback : public AsyncSocket::ConnectCallback {
62 : state(STATE_WAITING)
63 , exception(AsyncSocketException::UNKNOWN, "none") {}
65 void connectSuccess() noexcept override {
66 state = STATE_SUCCEEDED;
67 if (successCallback) {
72 void connectErr(const AsyncSocketException& ex) noexcept override {
81 AsyncSocketException exception;
82 VoidCallback successCallback;
83 VoidCallback errorCallback;
86 class WriteCallback : public AsyncTransportWrapper::WriteCallback {
89 : state(STATE_WAITING)
91 , exception(AsyncSocketException::UNKNOWN, "none") {}
93 void writeSuccess() noexcept override {
94 state = STATE_SUCCEEDED;
95 if (successCallback) {
100 void writeErr(size_t bytesWritten,
101 const AsyncSocketException& ex) noexcept override {
102 state = STATE_FAILED;
103 this->bytesWritten = bytesWritten;
112 AsyncSocketException exception;
113 VoidCallback successCallback;
114 VoidCallback errorCallback;
117 class ReadCallback : public AsyncTransportWrapper::ReadCallback {
120 : state(STATE_WAITING)
121 , exception(AsyncSocketException::UNKNOWN, "none")
125 for (vector<Buffer>::iterator it = buffers.begin();
130 currentBuffer.free();
133 void getReadBuffer(void** bufReturn, size_t* lenReturn) override {
134 if (!currentBuffer.buffer) {
135 currentBuffer.allocate(4096);
137 *bufReturn = currentBuffer.buffer;
138 *lenReturn = currentBuffer.length;
141 void readDataAvailable(size_t len) noexcept override {
142 currentBuffer.length = len;
143 buffers.push_back(currentBuffer);
144 currentBuffer.reset();
145 if (dataAvailableCallback) {
146 dataAvailableCallback();
150 void readEOF() noexcept override {
151 state = STATE_SUCCEEDED;
154 void readErr(const AsyncSocketException& ex) noexcept override {
155 state = STATE_FAILED;
159 void verifyData(const char* expected, size_t expectedLen) const {
161 for (size_t idx = 0; idx < buffers.size(); ++idx) {
162 const auto& buf = buffers[idx];
163 size_t cmpLen = std::min(buf.length, expectedLen - offset);
164 CHECK_EQ(memcmp(buf.buffer, expected + offset, cmpLen), 0);
165 CHECK_EQ(cmpLen, buf.length);
168 CHECK_EQ(offset, expectedLen);
173 Buffer() : buffer(nullptr), length(0) {}
174 Buffer(char* buf, size_t len) : buffer(buf), length(len) {}
180 void allocate(size_t length) {
181 assert(buffer == nullptr);
182 this->buffer = static_cast<char*>(malloc(length));
183 this->length = length;
195 AsyncSocketException exception;
196 vector<Buffer> buffers;
197 Buffer currentBuffer;
198 VoidCallback dataAvailableCallback;
206 // Create a TestServer.
207 // This immediately starts listening on an ephemeral port.
210 fd_ = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
212 throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
213 "failed to create test server socket", errno);
215 if (fcntl(fd_, F_SETFL, O_NONBLOCK) != 0) {
216 throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
217 "failed to put test server socket in "
218 "non-blocking mode", errno);
220 if (listen(fd_, 10) != 0) {
221 throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
222 "failed to listen on test server socket",
226 address_.setFromLocalAddress(fd_);
227 // The local address will contain 0.0.0.0.
228 // Change it to 127.0.0.1, so it can be used to connect to the server
229 address_.setFromIpPort("127.0.0.1", address_.getPort());
232 // Get the address for connecting to the server
233 const folly::SocketAddress& getAddress() const {
237 int acceptFD(int timeout=50) {
241 int ret = poll(&pfd, 1, timeout);
243 throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
244 "test server accept() timed out");
245 } else if (ret < 0) {
246 throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
247 "test server accept() poll failed", errno);
250 int acceptedFd = ::accept(fd_, nullptr, nullptr);
251 if (acceptedFd < 0) {
252 throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
253 "test server accept() failed", errno);
259 std::shared_ptr<BlockingSocket> accept(int timeout=50) {
260 int fd = acceptFD(timeout);
261 return std::shared_ptr<BlockingSocket>(new BlockingSocket(fd));
264 std::shared_ptr<AsyncSocket> acceptAsync(EventBase* evb, int timeout=50) {
265 int fd = acceptFD(timeout);
266 return AsyncSocket::newSocket(evb, fd);
270 * Accept a connection, read data from it, and verify that it matches the
271 * data in the specified buffer.
273 void verifyConnection(const char* buf, size_t len) {
274 // accept a connection
275 std::shared_ptr<BlockingSocket> acceptedSocket = accept();
276 // read the data and compare it to the specified buffer
277 scoped_array<uint8_t> readbuf(new uint8_t[len]);
278 acceptedSocket->readAll(readbuf.get(), len);
279 CHECK_EQ(memcmp(buf, readbuf.get(), len), 0);
280 // make sure we get EOF next
281 uint32_t bytesRead = acceptedSocket->read(readbuf.get(), len);
282 CHECK_EQ(bytesRead, 0);
287 folly::SocketAddress address_;
290 class DelayedWrite: public AsyncTimeout {
292 DelayedWrite(const std::shared_ptr<AsyncSocket>& socket,
293 unique_ptr<IOBuf>&& bufs, AsyncTransportWrapper::WriteCallback* wcb,
294 bool cork, bool lastWrite = false):
295 AsyncTimeout(socket->getEventBase()),
297 bufs_(std::move(bufs)),
300 lastWrite_(lastWrite) {}
303 void timeoutExpired() noexcept override {
304 WriteFlags flags = cork_ ? WriteFlags::CORK : WriteFlags::NONE;
305 socket_->writeChain(wcb_, std::move(bufs_), flags);
307 socket_->shutdownWrite();
311 std::shared_ptr<AsyncSocket> socket_;
312 unique_ptr<IOBuf> bufs_;
313 AsyncTransportWrapper::WriteCallback* wcb_;
318 ///////////////////////////////////////////////////////////////////////////
320 ///////////////////////////////////////////////////////////////////////////
323 * Test connecting to a server
325 TEST(AsyncSocketTest, Connect) {
326 // Start listening on a local port
329 // Connect using a AsyncSocket
331 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
333 socket->connect(&cb, server.getAddress(), 30);
337 CHECK_EQ(cb.state, STATE_SUCCEEDED);
341 * Test connecting to a server that isn't listening
343 TEST(AsyncSocketTest, ConnectRefused) {
346 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
348 // Hopefully nothing is actually listening on this address
349 folly::SocketAddress addr("127.0.0.1", 65535);
351 socket->connect(&cb, addr, 30);
355 CHECK_EQ(cb.state, STATE_FAILED);
356 CHECK_EQ(cb.exception.getType(), AsyncSocketException::NOT_OPEN);
360 * Test connection timeout
362 TEST(AsyncSocketTest, ConnectTimeout) {
365 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
367 // Try connecting to server that won't respond.
369 // This depends somewhat on the network where this test is run.
370 // Hopefully this IP will be routable but unresponsive.
371 // (Alternatively, we could try listening on a local raw socket, but that
372 // normally requires root privileges.)
373 folly::SocketAddress addr("8.8.8.8", 65535);
375 socket->connect(&cb, addr, 1); // also set a ridiculously small timeout
379 CHECK_EQ(cb.state, STATE_FAILED);
380 CHECK_EQ(cb.exception.getType(), AsyncSocketException::TIMED_OUT);
382 // Verify that we can still get the peer address after a timeout.
383 // Use case is if the client was created from a client pool, and we want
384 // to log which peer failed.
385 folly::SocketAddress peer;
386 socket->getPeerAddress(&peer);
387 CHECK_EQ(peer, addr);
391 * Test writing immediately after connecting, without waiting for connect
394 TEST(AsyncSocketTest, ConnectAndWrite) {
399 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
401 socket->connect(&ccb, server.getAddress(), 30);
405 memset(buf, 'a', sizeof(buf));
407 socket->write(&wcb, buf, sizeof(buf));
409 // Loop. We don't bother accepting on the server socket yet.
410 // The kernel should be able to buffer the write request so it can succeed.
413 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
414 CHECK_EQ(wcb.state, STATE_SUCCEEDED);
416 // Make sure the server got a connection and received the data
418 server.verifyConnection(buf, sizeof(buf));
422 * Test connecting using a nullptr connect callback.
424 TEST(AsyncSocketTest, ConnectNullCallback) {
429 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
430 socket->connect(nullptr, server.getAddress(), 30);
432 // write some data, just so we have some way of verifing
433 // that the socket works correctly after connecting
435 memset(buf, 'a', sizeof(buf));
437 socket->write(&wcb, buf, sizeof(buf));
441 CHECK_EQ(wcb.state, STATE_SUCCEEDED);
443 // Make sure the server got a connection and received the data
445 server.verifyConnection(buf, sizeof(buf));
449 * Test calling both write() and close() immediately after connecting, without
450 * waiting for connect to finish.
452 * This exercises the STATE_CONNECTING_CLOSING code.
454 TEST(AsyncSocketTest, ConnectWriteAndClose) {
459 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
461 socket->connect(&ccb, server.getAddress(), 30);
465 memset(buf, 'a', sizeof(buf));
467 socket->write(&wcb, buf, sizeof(buf));
472 // Loop. We don't bother accepting on the server socket yet.
473 // The kernel should be able to buffer the write request so it can succeed.
476 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
477 CHECK_EQ(wcb.state, STATE_SUCCEEDED);
479 // Make sure the server got a connection and received the data
480 server.verifyConnection(buf, sizeof(buf));
484 * Test calling close() immediately after connect()
486 TEST(AsyncSocketTest, ConnectAndClose) {
489 // Connect using a AsyncSocket
491 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
493 socket->connect(&ccb, server.getAddress(), 30);
495 // Hopefully the connect didn't succeed immediately.
496 // If it did, we can't exercise the close-while-connecting code path.
497 if (ccb.state == STATE_SUCCEEDED) {
498 LOG(INFO) << "connect() succeeded immediately; aborting test "
499 "of close-during-connect behavior";
505 // Loop, although there shouldn't be anything to do.
508 // Make sure the connection was aborted
509 CHECK_EQ(ccb.state, STATE_FAILED);
513 * Test calling closeNow() immediately after connect()
515 * This should be identical to the normal close behavior.
517 TEST(AsyncSocketTest, ConnectAndCloseNow) {
520 // Connect using a AsyncSocket
522 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
524 socket->connect(&ccb, server.getAddress(), 30);
526 // Hopefully the connect didn't succeed immediately.
527 // If it did, we can't exercise the close-while-connecting code path.
528 if (ccb.state == STATE_SUCCEEDED) {
529 LOG(INFO) << "connect() succeeded immediately; aborting test "
530 "of closeNow()-during-connect behavior";
536 // Loop, although there shouldn't be anything to do.
539 // Make sure the connection was aborted
540 CHECK_EQ(ccb.state, STATE_FAILED);
544 * Test calling both write() and closeNow() immediately after connecting,
545 * without waiting for connect to finish.
547 * This should abort the pending write.
549 TEST(AsyncSocketTest, ConnectWriteAndCloseNow) {
554 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
556 socket->connect(&ccb, server.getAddress(), 30);
558 // Hopefully the connect didn't succeed immediately.
559 // If it did, we can't exercise the close-while-connecting code path.
560 if (ccb.state == STATE_SUCCEEDED) {
561 LOG(INFO) << "connect() succeeded immediately; aborting test "
562 "of write-during-connect behavior";
568 memset(buf, 'a', sizeof(buf));
570 socket->write(&wcb, buf, sizeof(buf));
575 // Loop, although there shouldn't be anything to do.
578 CHECK_EQ(ccb.state, STATE_FAILED);
579 CHECK_EQ(wcb.state, STATE_FAILED);
583 * Test installing a read callback immediately, before connect() finishes.
585 TEST(AsyncSocketTest, ConnectAndRead) {
590 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
592 socket->connect(&ccb, server.getAddress(), 30);
595 socket->setReadCB(&rcb);
597 // Even though we haven't looped yet, we should be able to accept
598 // the connection and send data to it.
599 std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
601 memset(buf, 'a', sizeof(buf));
602 acceptedSocket->write(buf, sizeof(buf));
603 acceptedSocket->flush();
604 acceptedSocket->close();
606 // Loop, although there shouldn't be anything to do.
609 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
610 CHECK_EQ(rcb.state, STATE_SUCCEEDED);
611 CHECK_EQ(rcb.buffers.size(), 1);
612 CHECK_EQ(rcb.buffers[0].length, sizeof(buf));
613 CHECK_EQ(memcmp(rcb.buffers[0].buffer, buf, sizeof(buf)), 0);
617 * Test installing a read callback and then closing immediately before the
618 * connect attempt finishes.
620 TEST(AsyncSocketTest, ConnectReadAndClose) {
625 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
627 socket->connect(&ccb, server.getAddress(), 30);
629 // Hopefully the connect didn't succeed immediately.
630 // If it did, we can't exercise the close-while-connecting code path.
631 if (ccb.state == STATE_SUCCEEDED) {
632 LOG(INFO) << "connect() succeeded immediately; aborting test "
633 "of read-during-connect behavior";
638 socket->setReadCB(&rcb);
643 // Loop, although there shouldn't be anything to do.
646 CHECK_EQ(ccb.state, STATE_FAILED); // we aborted the close attempt
647 CHECK_EQ(rcb.buffers.size(), 0);
648 CHECK_EQ(rcb.state, STATE_SUCCEEDED); // this indicates EOF
652 * Test both writing and installing a read callback immediately,
653 * before connect() finishes.
655 TEST(AsyncSocketTest, ConnectWriteAndRead) {
660 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
662 socket->connect(&ccb, server.getAddress(), 30);
666 memset(buf1, 'a', sizeof(buf1));
668 socket->write(&wcb, buf1, sizeof(buf1));
670 // set a read callback
672 socket->setReadCB(&rcb);
674 // Even though we haven't looped yet, we should be able to accept
675 // the connection and send data to it.
676 std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
678 memset(buf2, 'b', sizeof(buf2));
679 acceptedSocket->write(buf2, sizeof(buf2));
680 acceptedSocket->flush();
682 // shut down the write half of acceptedSocket, so that the AsyncSocket
683 // will stop reading and we can break out of the event loop.
684 shutdown(acceptedSocket->getSocketFD(), SHUT_WR);
689 // Make sure the connect succeeded
690 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
692 // Make sure the AsyncSocket read the data written by the accepted socket
693 CHECK_EQ(rcb.state, STATE_SUCCEEDED);
694 CHECK_EQ(rcb.buffers.size(), 1);
695 CHECK_EQ(rcb.buffers[0].length, sizeof(buf2));
696 CHECK_EQ(memcmp(rcb.buffers[0].buffer, buf2, sizeof(buf2)), 0);
698 // Close the AsyncSocket so we'll see EOF on acceptedSocket
701 // Make sure the accepted socket saw the data written by the AsyncSocket
702 uint8_t readbuf[sizeof(buf1)];
703 acceptedSocket->readAll(readbuf, sizeof(readbuf));
704 CHECK_EQ(memcmp(buf1, readbuf, sizeof(buf1)), 0);
705 uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
706 CHECK_EQ(bytesRead, 0);
710 * Test writing to the socket then shutting down writes before the connect
713 TEST(AsyncSocketTest, ConnectWriteAndShutdownWrite) {
718 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
720 socket->connect(&ccb, server.getAddress(), 30);
722 // Hopefully the connect didn't succeed immediately.
723 // If it did, we can't exercise the write-while-connecting code path.
724 if (ccb.state == STATE_SUCCEEDED) {
725 LOG(INFO) << "connect() succeeded immediately; skipping test";
729 // Ask to write some data
731 memset(wbuf, 'a', sizeof(wbuf));
733 socket->write(&wcb, wbuf, sizeof(wbuf));
734 socket->shutdownWrite();
737 socket->shutdownWrite();
739 // Even though we haven't looped yet, we should be able to accept
741 std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
743 // Since the connection is still in progress, there should be no data to
744 // read yet. Verify that the accepted socket is not readable.
745 struct pollfd fds[1];
746 fds[0].fd = acceptedSocket->getSocketFD();
747 fds[0].events = POLLIN;
749 int rc = poll(fds, 1, 0);
752 // Write data to the accepted socket
753 uint8_t acceptedWbuf[192];
754 memset(acceptedWbuf, 'b', sizeof(acceptedWbuf));
755 acceptedSocket->write(acceptedWbuf, sizeof(acceptedWbuf));
756 acceptedSocket->flush();
761 // The loop should have completed the connection, written the queued data,
762 // and shutdown writes on the socket.
764 // Check that the connection was completed successfully and that the write
765 // callback succeeded.
766 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
767 CHECK_EQ(wcb.state, STATE_SUCCEEDED);
769 // Check that we can read the data that was written to the socket, and that
770 // we see an EOF, since its socket was half-shutdown.
771 uint8_t readbuf[sizeof(wbuf)];
772 acceptedSocket->readAll(readbuf, sizeof(readbuf));
773 CHECK_EQ(memcmp(wbuf, readbuf, sizeof(wbuf)), 0);
774 uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
775 CHECK_EQ(bytesRead, 0);
777 // Close the accepted socket. This will cause it to see EOF
778 // and uninstall the read callback when we loop next.
779 acceptedSocket->close();
781 // Install a read callback, then loop again.
783 socket->setReadCB(&rcb);
786 // This loop should have read the data and seen the EOF
787 CHECK_EQ(rcb.state, STATE_SUCCEEDED);
788 CHECK_EQ(rcb.buffers.size(), 1);
789 CHECK_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf));
790 CHECK_EQ(memcmp(rcb.buffers[0].buffer,
791 acceptedWbuf, sizeof(acceptedWbuf)), 0);
795 * Test reading, writing, and shutting down writes before the connect attempt
798 TEST(AsyncSocketTest, ConnectReadWriteAndShutdownWrite) {
803 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
805 socket->connect(&ccb, server.getAddress(), 30);
807 // Hopefully the connect didn't succeed immediately.
808 // If it did, we can't exercise the write-while-connecting code path.
809 if (ccb.state == STATE_SUCCEEDED) {
810 LOG(INFO) << "connect() succeeded immediately; skipping test";
814 // Install a read callback
816 socket->setReadCB(&rcb);
818 // Ask to write some data
820 memset(wbuf, 'a', sizeof(wbuf));
822 socket->write(&wcb, wbuf, sizeof(wbuf));
825 socket->shutdownWrite();
827 // Even though we haven't looped yet, we should be able to accept
829 std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
831 // Since the connection is still in progress, there should be no data to
832 // read yet. Verify that the accepted socket is not readable.
833 struct pollfd fds[1];
834 fds[0].fd = acceptedSocket->getSocketFD();
835 fds[0].events = POLLIN;
837 int rc = poll(fds, 1, 0);
840 // Write data to the accepted socket
841 uint8_t acceptedWbuf[192];
842 memset(acceptedWbuf, 'b', sizeof(acceptedWbuf));
843 acceptedSocket->write(acceptedWbuf, sizeof(acceptedWbuf));
844 acceptedSocket->flush();
845 // Shutdown writes to the accepted socket. This will cause it to see EOF
846 // and uninstall the read callback.
847 ::shutdown(acceptedSocket->getSocketFD(), SHUT_WR);
852 // The loop should have completed the connection, written the queued data,
853 // shutdown writes on the socket, read the data we wrote to it, and see the
856 // Check that the connection was completed successfully and that the read
857 // and write callbacks were invoked as expected.
858 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
859 CHECK_EQ(rcb.state, STATE_SUCCEEDED);
860 CHECK_EQ(rcb.buffers.size(), 1);
861 CHECK_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf));
862 CHECK_EQ(memcmp(rcb.buffers[0].buffer,
863 acceptedWbuf, sizeof(acceptedWbuf)), 0);
864 CHECK_EQ(wcb.state, STATE_SUCCEEDED);
866 // Check that we can read the data that was written to the socket, and that
867 // we see an EOF, since its socket was half-shutdown.
868 uint8_t readbuf[sizeof(wbuf)];
869 acceptedSocket->readAll(readbuf, sizeof(readbuf));
870 CHECK_EQ(memcmp(wbuf, readbuf, sizeof(wbuf)), 0);
871 uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
872 CHECK_EQ(bytesRead, 0);
874 // Fully close both sockets
875 acceptedSocket->close();
880 * Test reading, writing, and calling shutdownWriteNow() before the
881 * connect attempt finishes.
883 TEST(AsyncSocketTest, ConnectReadWriteAndShutdownWriteNow) {
888 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
890 socket->connect(&ccb, server.getAddress(), 30);
892 // Hopefully the connect didn't succeed immediately.
893 // If it did, we can't exercise the write-while-connecting code path.
894 if (ccb.state == STATE_SUCCEEDED) {
895 LOG(INFO) << "connect() succeeded immediately; skipping test";
899 // Install a read callback
901 socket->setReadCB(&rcb);
903 // Ask to write some data
905 memset(wbuf, 'a', sizeof(wbuf));
907 socket->write(&wcb, wbuf, sizeof(wbuf));
909 // Shutdown writes immediately.
910 // This should immediately discard the data that we just tried to write.
911 socket->shutdownWriteNow();
913 // Verify that writeError() was invoked on the write callback.
914 CHECK_EQ(wcb.state, STATE_FAILED);
915 CHECK_EQ(wcb.bytesWritten, 0);
917 // Even though we haven't looped yet, we should be able to accept
919 std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
921 // Since the connection is still in progress, there should be no data to
922 // read yet. Verify that the accepted socket is not readable.
923 struct pollfd fds[1];
924 fds[0].fd = acceptedSocket->getSocketFD();
925 fds[0].events = POLLIN;
927 int rc = poll(fds, 1, 0);
930 // Write data to the accepted socket
931 uint8_t acceptedWbuf[192];
932 memset(acceptedWbuf, 'b', sizeof(acceptedWbuf));
933 acceptedSocket->write(acceptedWbuf, sizeof(acceptedWbuf));
934 acceptedSocket->flush();
935 // Shutdown writes to the accepted socket. This will cause it to see EOF
936 // and uninstall the read callback.
937 ::shutdown(acceptedSocket->getSocketFD(), SHUT_WR);
942 // The loop should have completed the connection, written the queued data,
943 // shutdown writes on the socket, read the data we wrote to it, and see the
946 // Check that the connection was completed successfully and that the read
947 // callback was invoked as expected.
948 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
949 CHECK_EQ(rcb.state, STATE_SUCCEEDED);
950 CHECK_EQ(rcb.buffers.size(), 1);
951 CHECK_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf));
952 CHECK_EQ(memcmp(rcb.buffers[0].buffer,
953 acceptedWbuf, sizeof(acceptedWbuf)), 0);
955 // Since we used shutdownWriteNow(), it should have discarded all pending
956 // write data. Verify we see an immediate EOF when reading from the accepted
958 uint8_t readbuf[sizeof(wbuf)];
959 uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
960 CHECK_EQ(bytesRead, 0);
962 // Fully close both sockets
963 acceptedSocket->close();
967 // Helper function for use in testConnectOptWrite()
968 // Temporarily disable the read callback
969 void tmpDisableReads(AsyncSocket* socket, ReadCallback* rcb) {
970 // Uninstall the read callback
971 socket->setReadCB(nullptr);
972 // Schedule the read callback to be reinstalled after 1ms
973 socket->getEventBase()->runInLoop(
974 std::bind(&AsyncSocket::setReadCB, socket, rcb));
978 * Test connect+write, then have the connect callback perform another write.
980 * This tests interaction of the optimistic writing after connect with
981 * additional write attempts that occur in the connect callback.
983 void testConnectOptWrite(size_t size1, size_t size2, bool close = false) {
986 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
990 socket->connect(&ccb, server.getAddress(), 30);
992 // Hopefully the connect didn't succeed immediately.
993 // If it did, we can't exercise the optimistic write code path.
994 if (ccb.state == STATE_SUCCEEDED) {
995 LOG(INFO) << "connect() succeeded immediately; aborting test "
996 "of optimistic write behavior";
1000 // Tell the connect callback to perform a write when the connect succeeds
1002 scoped_array<char> buf2(new char[size2]);
1003 memset(buf2.get(), 'b', size2);
1005 ccb.successCallback = [&] { socket->write(&wcb2, buf2.get(), size2); };
1006 // Tell the second write callback to close the connection when it is done
1007 wcb2.successCallback = [&] { socket->closeNow(); };
1010 // Schedule one write() immediately, before the connect finishes
1011 scoped_array<char> buf1(new char[size1]);
1012 memset(buf1.get(), 'a', size1);
1015 socket->write(&wcb1, buf1.get(), size1);
1019 // immediately perform a close, before connect() completes
1023 // Start reading from the other endpoint after 10ms.
1024 // If we're using large buffers, we have to read so that the writes don't
1026 std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&evb);
1028 rcb.dataAvailableCallback = std::bind(tmpDisableReads,
1029 acceptedSocket.get(), &rcb);
1030 socket->getEventBase()->tryRunAfterDelay(
1031 std::bind(&AsyncSocket::setReadCB, acceptedSocket.get(), &rcb),
1034 // Loop. We don't bother accepting on the server socket yet.
1035 // The kernel should be able to buffer the write request so it can succeed.
1038 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
1040 CHECK_EQ(wcb1.state, STATE_SUCCEEDED);
1043 CHECK_EQ(wcb2.state, STATE_SUCCEEDED);
1048 // Make sure the read callback received all of the data
1049 size_t bytesRead = 0;
1050 for (vector<ReadCallback::Buffer>::const_iterator it = rcb.buffers.begin();
1051 it != rcb.buffers.end();
1053 size_t start = bytesRead;
1054 bytesRead += it->length;
1055 size_t end = bytesRead;
1056 if (start < size1) {
1057 size_t cmpLen = min(size1, end) - start;
1058 CHECK_EQ(memcmp(it->buffer, buf1.get() + start, cmpLen), 0);
1060 if (end > size1 && end <= size1 + size2) {
1064 if (start >= size1) {
1066 buf2Offset = start - size1;
1067 cmpLen = end - start;
1069 itOffset = size1 - start;
1071 cmpLen = end - size1;
1073 CHECK_EQ(memcmp(it->buffer + itOffset, buf2.get() + buf2Offset,
1078 CHECK_EQ(bytesRead, size1 + size2);
1081 TEST(AsyncSocketTest, ConnectCallbackWrite) {
1082 // Test using small writes that should both succeed immediately
1083 testConnectOptWrite(100, 200);
1085 // Test using a large buffer in the connect callback, that should block
1086 const size_t largeSize = 8*1024*1024;
1087 testConnectOptWrite(100, largeSize);
1089 // Test using a large initial write
1090 testConnectOptWrite(largeSize, 100);
1092 // Test using two large buffers
1093 testConnectOptWrite(largeSize, largeSize);
1095 // Test a small write in the connect callback,
1096 // but no immediate write before connect completes
1097 testConnectOptWrite(0, 64);
1099 // Test a large write in the connect callback,
1100 // but no immediate write before connect completes
1101 testConnectOptWrite(0, largeSize);
1103 // Test connect, a small write, then immediately call close() before connect
1105 testConnectOptWrite(211, 0, true);
1107 // Test connect, a large immediate write (that will block), then immediately
1108 // call close() before connect completes
1109 testConnectOptWrite(largeSize, 0, true);
1112 ///////////////////////////////////////////////////////////////////////////
1113 // write() related tests
1114 ///////////////////////////////////////////////////////////////////////////
1117 * Test writing using a nullptr callback
1119 TEST(AsyncSocketTest, WriteNullCallback) {
1124 std::shared_ptr<AsyncSocket> socket =
1125 AsyncSocket::newSocket(&evb, server.getAddress(), 30);
1126 evb.loop(); // loop until the socket is connected
1128 // write() with a nullptr callback
1130 memset(buf, 'a', sizeof(buf));
1131 socket->write(nullptr, buf, sizeof(buf));
1133 evb.loop(); // loop until the data is sent
1135 // Make sure the server got a connection and received the data
1137 server.verifyConnection(buf, sizeof(buf));
1141 * Test writing with a send timeout
1143 TEST(AsyncSocketTest, WriteTimeout) {
1148 std::shared_ptr<AsyncSocket> socket =
1149 AsyncSocket::newSocket(&evb, server.getAddress(), 30);
1150 evb.loop(); // loop until the socket is connected
1152 // write() a large chunk of data, with no-one on the other end reading
1153 size_t writeLength = 8*1024*1024;
1154 uint32_t timeout = 200;
1155 socket->setSendTimeout(timeout);
1156 scoped_array<char> buf(new char[writeLength]);
1157 memset(buf.get(), 'a', writeLength);
1159 socket->write(&wcb, buf.get(), writeLength);
1165 // Make sure the write attempt timed out as requested
1166 CHECK_EQ(wcb.state, STATE_FAILED);
1167 CHECK_EQ(wcb.exception.getType(), AsyncSocketException::TIMED_OUT);
1169 // Check that the write timed out within a reasonable period of time.
1170 // We don't check for exactly the specified timeout, since AsyncSocket only
1171 // times out when it hasn't made progress for that period of time.
1173 // On linux, the first write sends a few hundred kb of data, then blocks for
1174 // writability, and then unblocks again after 40ms and is able to write
1175 // another smaller of data before blocking permanently. Therefore it doesn't
1176 // time out until 40ms + timeout.
1178 // I haven't fully verified the cause of this, but I believe it probably
1179 // occurs because the receiving end delays sending an ack for up to 40ms.
1180 // (This is the default value for TCP_DELACK_MIN.) Once the sender receives
1181 // the ack, it can send some more data. However, after that point the
1182 // receiver's kernel buffer is full. This 40ms delay happens even with
1183 // TCP_NODELAY and TCP_QUICKACK enabled on both endpoints. However, the
1184 // kernel may be automatically disabling TCP_QUICKACK after receiving some
1187 // For now, we simply check that the timeout occurred within 160ms of
1188 // the requested value.
1189 T_CHECK_TIMEOUT(start, end, milliseconds(timeout), milliseconds(160));
1193 * Test writing to a socket that the remote endpoint has closed
1195 TEST(AsyncSocketTest, WritePipeError) {
1200 std::shared_ptr<AsyncSocket> socket =
1201 AsyncSocket::newSocket(&evb, server.getAddress(), 30);
1202 socket->setSendTimeout(1000);
1203 evb.loop(); // loop until the socket is connected
1205 // accept and immediately close the socket
1206 std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
1207 acceptedSocket.reset();
1209 // write() a large chunk of data
1210 size_t writeLength = 8*1024*1024;
1211 scoped_array<char> buf(new char[writeLength]);
1212 memset(buf.get(), 'a', writeLength);
1214 socket->write(&wcb, buf.get(), writeLength);
1218 // Make sure the write failed.
1219 // It would be nice if AsyncSocketException could convey the errno value,
1220 // so that we could check for EPIPE
1221 CHECK_EQ(wcb.state, STATE_FAILED);
1222 CHECK_EQ(wcb.exception.getType(),
1223 AsyncSocketException::INTERNAL_ERROR);
1227 * Test writing a mix of simple buffers and IOBufs
1229 TEST(AsyncSocketTest, WriteIOBuf) {
1234 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
1236 socket->connect(&ccb, server.getAddress(), 30);
1238 // Accept the connection
1239 std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&evb);
1241 acceptedSocket->setReadCB(&rcb);
1243 // Write a simple buffer to the socket
1244 size_t simpleBufLength = 5;
1245 char simpleBuf[simpleBufLength];
1246 memset(simpleBuf, 'a', simpleBufLength);
1248 socket->write(&wcb, simpleBuf, simpleBufLength);
1250 // Write a single-element IOBuf chain
1251 size_t buf1Length = 7;
1252 unique_ptr<IOBuf> buf1(IOBuf::create(buf1Length));
1253 memset(buf1->writableData(), 'b', buf1Length);
1254 buf1->append(buf1Length);
1255 unique_ptr<IOBuf> buf1Copy(buf1->clone());
1257 socket->writeChain(&wcb2, std::move(buf1));
1259 // Write a multiple-element IOBuf chain
1260 size_t buf2Length = 11;
1261 unique_ptr<IOBuf> buf2(IOBuf::create(buf2Length));
1262 memset(buf2->writableData(), 'c', buf2Length);
1263 buf2->append(buf2Length);
1264 size_t buf3Length = 13;
1265 unique_ptr<IOBuf> buf3(IOBuf::create(buf3Length));
1266 memset(buf3->writableData(), 'd', buf3Length);
1267 buf3->append(buf3Length);
1268 buf2->appendChain(std::move(buf3));
1269 unique_ptr<IOBuf> buf2Copy(buf2->clone());
1270 buf2Copy->coalesce();
1272 socket->writeChain(&wcb3, std::move(buf2));
1273 socket->shutdownWrite();
1275 // Let the reads and writes run to completion
1278 CHECK_EQ(wcb.state, STATE_SUCCEEDED);
1279 CHECK_EQ(wcb2.state, STATE_SUCCEEDED);
1280 CHECK_EQ(wcb3.state, STATE_SUCCEEDED);
1282 // Make sure the reader got the right data in the right order
1283 CHECK_EQ(rcb.state, STATE_SUCCEEDED);
1284 CHECK_EQ(rcb.buffers.size(), 1);
1285 CHECK_EQ(rcb.buffers[0].length,
1286 simpleBufLength + buf1Length + buf2Length + buf3Length);
1288 memcmp(rcb.buffers[0].buffer, simpleBuf, simpleBufLength), 0);
1290 memcmp(rcb.buffers[0].buffer + simpleBufLength,
1291 buf1Copy->data(), buf1Copy->length()), 0);
1293 memcmp(rcb.buffers[0].buffer + simpleBufLength + buf1Length,
1294 buf2Copy->data(), buf2Copy->length()), 0);
1296 acceptedSocket->close();
1300 TEST(AsyncSocketTest, WriteIOBufCorked) {
1305 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
1307 socket->connect(&ccb, server.getAddress(), 30);
1309 // Accept the connection
1310 std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&evb);
1312 acceptedSocket->setReadCB(&rcb);
1314 // Do three writes, 100ms apart, with the "cork" flag set
1315 // on the second write. The reader should see the first write
1316 // arrive by itself, followed by the second and third writes
1317 // arriving together.
1318 size_t buf1Length = 5;
1319 unique_ptr<IOBuf> buf1(IOBuf::create(buf1Length));
1320 memset(buf1->writableData(), 'a', buf1Length);
1321 buf1->append(buf1Length);
1322 size_t buf2Length = 7;
1323 unique_ptr<IOBuf> buf2(IOBuf::create(buf2Length));
1324 memset(buf2->writableData(), 'b', buf2Length);
1325 buf2->append(buf2Length);
1326 size_t buf3Length = 11;
1327 unique_ptr<IOBuf> buf3(IOBuf::create(buf3Length));
1328 memset(buf3->writableData(), 'c', buf3Length);
1329 buf3->append(buf3Length);
1331 socket->writeChain(&wcb1, std::move(buf1));
1333 DelayedWrite write2(socket, std::move(buf2), &wcb2, true);
1334 write2.scheduleTimeout(100);
1336 DelayedWrite write3(socket, std::move(buf3), &wcb3, false, true);
1337 write3.scheduleTimeout(200);
1340 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
1341 CHECK_EQ(wcb1.state, STATE_SUCCEEDED);
1342 CHECK_EQ(wcb2.state, STATE_SUCCEEDED);
1343 if (wcb3.state != STATE_SUCCEEDED) {
1344 throw(wcb3.exception);
1346 CHECK_EQ(wcb3.state, STATE_SUCCEEDED);
1348 // Make sure the reader got the data with the right grouping
1349 CHECK_EQ(rcb.state, STATE_SUCCEEDED);
1350 CHECK_EQ(rcb.buffers.size(), 2);
1351 CHECK_EQ(rcb.buffers[0].length, buf1Length);
1352 CHECK_EQ(rcb.buffers[1].length, buf2Length + buf3Length);
1354 acceptedSocket->close();
1359 * Test performing a zero-length write
1361 TEST(AsyncSocketTest, ZeroLengthWrite) {
1366 std::shared_ptr<AsyncSocket> socket =
1367 AsyncSocket::newSocket(&evb, server.getAddress(), 30);
1368 evb.loop(); // loop until the socket is connected
1370 auto acceptedSocket = server.acceptAsync(&evb);
1372 acceptedSocket->setReadCB(&rcb);
1374 size_t len1 = 1024*1024;
1375 size_t len2 = 1024*1024;
1376 std::unique_ptr<char[]> buf(new char[len1 + len2]);
1377 memset(buf.get(), 'a', len1);
1378 memset(buf.get(), 'b', len2);
1384 socket->write(&wcb1, buf.get(), 0);
1385 socket->write(&wcb2, buf.get(), len1);
1386 socket->write(&wcb3, buf.get() + len1, 0);
1387 socket->write(&wcb4, buf.get() + len1, len2);
1390 evb.loop(); // loop until the data is sent
1392 CHECK_EQ(wcb1.state, STATE_SUCCEEDED);
1393 CHECK_EQ(wcb2.state, STATE_SUCCEEDED);
1394 CHECK_EQ(wcb3.state, STATE_SUCCEEDED);
1395 CHECK_EQ(wcb4.state, STATE_SUCCEEDED);
1396 rcb.verifyData(buf.get(), len1 + len2);
1399 TEST(AsyncSocketTest, ZeroLengthWritev) {
1404 std::shared_ptr<AsyncSocket> socket =
1405 AsyncSocket::newSocket(&evb, server.getAddress(), 30);
1406 evb.loop(); // loop until the socket is connected
1408 auto acceptedSocket = server.acceptAsync(&evb);
1410 acceptedSocket->setReadCB(&rcb);
1412 size_t len1 = 1024*1024;
1413 size_t len2 = 1024*1024;
1414 std::unique_ptr<char[]> buf(new char[len1 + len2]);
1415 memset(buf.get(), 'a', len1);
1416 memset(buf.get(), 'b', len2);
1419 size_t iovCount = 4;
1420 struct iovec iov[iovCount];
1421 iov[0].iov_base = buf.get();
1422 iov[0].iov_len = len1;
1423 iov[1].iov_base = buf.get() + len1;
1425 iov[2].iov_base = buf.get() + len1;
1426 iov[2].iov_len = len2;
1427 iov[3].iov_base = buf.get() + len1 + len2;
1430 socket->writev(&wcb, iov, iovCount);
1432 evb.loop(); // loop until the data is sent
1434 CHECK_EQ(wcb.state, STATE_SUCCEEDED);
1435 rcb.verifyData(buf.get(), len1 + len2);
1438 ///////////////////////////////////////////////////////////////////////////
1439 // close() related tests
1440 ///////////////////////////////////////////////////////////////////////////
1443 * Test calling close() with pending writes when the socket is already closing.
1445 TEST(AsyncSocketTest, ClosePendingWritesWhileClosing) {
1450 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
1452 socket->connect(&ccb, server.getAddress(), 30);
1454 // accept the socket on the server side
1455 std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
1457 // Loop to ensure the connect has completed
1460 // Make sure we are connected
1461 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
1463 // Schedule pending writes, until several write attempts have blocked
1465 memset(buf, 'a', sizeof(buf));
1466 typedef vector< std::shared_ptr<WriteCallback> > WriteCallbackVector;
1467 WriteCallbackVector writeCallbacks;
1469 writeCallbacks.reserve(5);
1470 while (writeCallbacks.size() < 5) {
1471 std::shared_ptr<WriteCallback> wcb(new WriteCallback);
1473 socket->write(wcb.get(), buf, sizeof(buf));
1474 if (wcb->state == STATE_SUCCEEDED) {
1475 // Succeeded immediately. Keep performing more writes
1479 // This write is blocked.
1480 // Have the write callback call close() when writeError() is invoked
1481 wcb->errorCallback = std::bind(&AsyncSocket::close, socket.get());
1482 writeCallbacks.push_back(wcb);
1485 // Call closeNow() to immediately fail the pending writes
1488 // Make sure writeError() was invoked on all of the pending write callbacks
1489 for (WriteCallbackVector::const_iterator it = writeCallbacks.begin();
1490 it != writeCallbacks.end();
1492 CHECK_EQ((*it)->state, STATE_FAILED);
1498 // - Test connect() and have the connect callback set the read callback
1499 // - Test connect() and have the connect callback unset the read callback
1500 // - Test reading/writing/closing/destroying the socket in the connect callback
1501 // - Test reading/writing/closing/destroying the socket in the read callback
1502 // - Test reading/writing/closing/destroying the socket in the write callback
1503 // - Test one-way shutdown behavior
1504 // - Test changing the EventBase
1506 // - TODO: test multiple threads sharing a AsyncSocket, and detaching from it
1507 // in connectSuccess(), readDataAvailable(), writeSuccess()
1510 ///////////////////////////////////////////////////////////////////////////
1511 // AsyncServerSocket tests
1512 ///////////////////////////////////////////////////////////////////////////
1515 * Helper AcceptCallback class for the test code
1516 * It records the callbacks that were invoked, and also supports calling
1517 * generic std::function objects in each callback.
1519 class TestAcceptCallback : public AsyncServerSocket::AcceptCallback {
1528 EventInfo(int fd, const folly::SocketAddress& addr)
1529 : type(TYPE_ACCEPT),
1533 explicit EventInfo(const std::string& msg)
1538 explicit EventInfo(EventType et)
1545 int fd; // valid for TYPE_ACCEPT
1546 folly::SocketAddress address; // valid for TYPE_ACCEPT
1547 string errorMsg; // valid for TYPE_ERROR
1549 typedef std::deque<EventInfo> EventList;
1551 TestAcceptCallback()
1552 : connectionAcceptedFn_(),
1557 std::deque<EventInfo>* getEvents() {
1561 void setConnectionAcceptedFn(
1562 const std::function<void(int, const folly::SocketAddress&)>& fn) {
1563 connectionAcceptedFn_ = fn;
1565 void setAcceptErrorFn(const std::function<void(const std::exception&)>& fn) {
1566 acceptErrorFn_ = fn;
1568 void setAcceptStartedFn(const std::function<void()>& fn) {
1569 acceptStartedFn_ = fn;
1571 void setAcceptStoppedFn(const std::function<void()>& fn) {
1572 acceptStoppedFn_ = fn;
1575 void connectionAccepted(int fd, const folly::SocketAddress& clientAddr)
1577 events_.push_back(EventInfo(fd, clientAddr));
1579 if (connectionAcceptedFn_) {
1580 connectionAcceptedFn_(fd, clientAddr);
1583 void acceptError(const std::exception& ex) noexcept {
1584 events_.push_back(EventInfo(ex.what()));
1586 if (acceptErrorFn_) {
1590 void acceptStarted() noexcept {
1591 events_.push_back(EventInfo(TYPE_START));
1593 if (acceptStartedFn_) {
1597 void acceptStopped() noexcept {
1598 events_.push_back(EventInfo(TYPE_STOP));
1600 if (acceptStoppedFn_) {
1606 std::function<void(int, const folly::SocketAddress&)> connectionAcceptedFn_;
1607 std::function<void(const std::exception&)> acceptErrorFn_;
1608 std::function<void()> acceptStartedFn_;
1609 std::function<void()> acceptStoppedFn_;
1611 std::deque<EventInfo> events_;
1615 * Make sure accepted sockets have O_NONBLOCK and TCP_NODELAY set
1617 TEST(AsyncSocketTest, ServerAcceptOptions) {
1618 EventBase eventBase;
1620 // Create a server socket
1621 std::shared_ptr<AsyncServerSocket> serverSocket(
1622 AsyncServerSocket::newSocket(&eventBase));
1623 serverSocket->bind(0);
1624 serverSocket->listen(16);
1625 folly::SocketAddress serverAddress;
1626 serverSocket->getAddress(&serverAddress);
1628 // Add a callback to accept one connection then stop the loop
1629 TestAcceptCallback acceptCallback;
1630 acceptCallback.setConnectionAcceptedFn(
1631 [&](int fd, const folly::SocketAddress& addr) {
1632 serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
1634 acceptCallback.setAcceptErrorFn([&](const std::exception& ex) {
1635 serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
1637 serverSocket->addAcceptCallback(&acceptCallback, nullptr);
1638 serverSocket->startAccepting();
1640 // Connect to the server socket
1641 std::shared_ptr<AsyncSocket> socket(
1642 AsyncSocket::newSocket(&eventBase, serverAddress));
1646 // Verify that the server accepted a connection
1647 CHECK_EQ(acceptCallback.getEvents()->size(), 3);
1648 CHECK_EQ(acceptCallback.getEvents()->at(0).type,
1649 TestAcceptCallback::TYPE_START);
1650 CHECK_EQ(acceptCallback.getEvents()->at(1).type,
1651 TestAcceptCallback::TYPE_ACCEPT);
1652 CHECK_EQ(acceptCallback.getEvents()->at(2).type,
1653 TestAcceptCallback::TYPE_STOP);
1654 int fd = acceptCallback.getEvents()->at(1).fd;
1656 // The accepted connection should already be in non-blocking mode
1657 int flags = fcntl(fd, F_GETFL, 0);
1658 CHECK_EQ(flags & O_NONBLOCK, O_NONBLOCK);
1661 // The accepted connection should already have TCP_NODELAY set
1663 socklen_t valueLength = sizeof(value);
1664 int rc = getsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &value, &valueLength);
1671 * Test AsyncServerSocket::removeAcceptCallback()
1673 TEST(AsyncSocketTest, RemoveAcceptCallback) {
1674 // Create a new AsyncServerSocket
1675 EventBase eventBase;
1676 std::shared_ptr<AsyncServerSocket> serverSocket(
1677 AsyncServerSocket::newSocket(&eventBase));
1678 serverSocket->bind(0);
1679 serverSocket->listen(16);
1680 folly::SocketAddress serverAddress;
1681 serverSocket->getAddress(&serverAddress);
1683 // Add several accept callbacks
1684 TestAcceptCallback cb1;
1685 TestAcceptCallback cb2;
1686 TestAcceptCallback cb3;
1687 TestAcceptCallback cb4;
1688 TestAcceptCallback cb5;
1689 TestAcceptCallback cb6;
1690 TestAcceptCallback cb7;
1692 // Test having callbacks remove other callbacks before them on the list,
1693 // after them on the list, or removing themselves.
1695 // Have callback 2 remove callback 3 and callback 5 the first time it is
1698 cb1.setConnectionAcceptedFn([&](int fd, const folly::SocketAddress& addr){
1699 std::shared_ptr<AsyncSocket> sock2(
1700 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb2: -cb3 -cb5
1702 cb3.setConnectionAcceptedFn([&](int fd, const folly::SocketAddress& addr){
1704 cb4.setConnectionAcceptedFn([&](int fd, const folly::SocketAddress& addr){
1705 std::shared_ptr<AsyncSocket> sock3(
1706 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb4
1708 cb5.setConnectionAcceptedFn([&](int fd, const folly::SocketAddress& addr){
1709 std::shared_ptr<AsyncSocket> sock5(
1710 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb7: -cb7
1713 cb2.setConnectionAcceptedFn(
1714 [&](int fd, const folly::SocketAddress& addr) {
1715 if (cb2Count == 0) {
1716 serverSocket->removeAcceptCallback(&cb3, nullptr);
1717 serverSocket->removeAcceptCallback(&cb5, nullptr);
1721 // Have callback 6 remove callback 4 the first time it is called,
1722 // and destroy the server socket the second time it is called
1724 cb6.setConnectionAcceptedFn(
1725 [&](int fd, const folly::SocketAddress& addr) {
1726 if (cb6Count == 0) {
1727 serverSocket->removeAcceptCallback(&cb4, nullptr);
1728 std::shared_ptr<AsyncSocket> sock6(
1729 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb1
1730 std::shared_ptr<AsyncSocket> sock7(
1731 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb2
1732 std::shared_ptr<AsyncSocket> sock8(
1733 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb6: stop
1736 serverSocket.reset();
1740 // Have callback 7 remove itself
1741 cb7.setConnectionAcceptedFn(
1742 [&](int fd, const folly::SocketAddress& addr) {
1743 serverSocket->removeAcceptCallback(&cb7, nullptr);
1746 serverSocket->addAcceptCallback(&cb1, nullptr);
1747 serverSocket->addAcceptCallback(&cb2, nullptr);
1748 serverSocket->addAcceptCallback(&cb3, nullptr);
1749 serverSocket->addAcceptCallback(&cb4, nullptr);
1750 serverSocket->addAcceptCallback(&cb5, nullptr);
1751 serverSocket->addAcceptCallback(&cb6, nullptr);
1752 serverSocket->addAcceptCallback(&cb7, nullptr);
1753 serverSocket->startAccepting();
1755 // Make several connections to the socket
1756 std::shared_ptr<AsyncSocket> sock1(
1757 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb1
1758 std::shared_ptr<AsyncSocket> sock4(
1759 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb6: -cb4
1761 // Loop until we are stopped
1764 // Check to make sure that the expected callbacks were invoked.
1766 // NOTE: This code depends on the AsyncServerSocket operating calling all of
1767 // the AcceptCallbacks in round-robin fashion, in the order that they were
1768 // added. The code is implemented this way right now, but the API doesn't
1769 // explicitly require it be done this way. If we change the code not to be
1770 // exactly round robin in the future, we can simplify the test checks here.
1771 // (We'll also need to update the termination code, since we expect cb6 to
1772 // get called twice to terminate the loop.)
1773 CHECK_EQ(cb1.getEvents()->size(), 4);
1774 CHECK_EQ(cb1.getEvents()->at(0).type,
1775 TestAcceptCallback::TYPE_START);
1776 CHECK_EQ(cb1.getEvents()->at(1).type,
1777 TestAcceptCallback::TYPE_ACCEPT);
1778 CHECK_EQ(cb1.getEvents()->at(2).type,
1779 TestAcceptCallback::TYPE_ACCEPT);
1780 CHECK_EQ(cb1.getEvents()->at(3).type,
1781 TestAcceptCallback::TYPE_STOP);
1783 CHECK_EQ(cb2.getEvents()->size(), 4);
1784 CHECK_EQ(cb2.getEvents()->at(0).type,
1785 TestAcceptCallback::TYPE_START);
1786 CHECK_EQ(cb2.getEvents()->at(1).type,
1787 TestAcceptCallback::TYPE_ACCEPT);
1788 CHECK_EQ(cb2.getEvents()->at(2).type,
1789 TestAcceptCallback::TYPE_ACCEPT);
1790 CHECK_EQ(cb2.getEvents()->at(3).type,
1791 TestAcceptCallback::TYPE_STOP);
1793 CHECK_EQ(cb3.getEvents()->size(), 2);
1794 CHECK_EQ(cb3.getEvents()->at(0).type,
1795 TestAcceptCallback::TYPE_START);
1796 CHECK_EQ(cb3.getEvents()->at(1).type,
1797 TestAcceptCallback::TYPE_STOP);
1799 CHECK_EQ(cb4.getEvents()->size(), 3);
1800 CHECK_EQ(cb4.getEvents()->at(0).type,
1801 TestAcceptCallback::TYPE_START);
1802 CHECK_EQ(cb4.getEvents()->at(1).type,
1803 TestAcceptCallback::TYPE_ACCEPT);
1804 CHECK_EQ(cb4.getEvents()->at(2).type,
1805 TestAcceptCallback::TYPE_STOP);
1807 CHECK_EQ(cb5.getEvents()->size(), 2);
1808 CHECK_EQ(cb5.getEvents()->at(0).type,
1809 TestAcceptCallback::TYPE_START);
1810 CHECK_EQ(cb5.getEvents()->at(1).type,
1811 TestAcceptCallback::TYPE_STOP);
1813 CHECK_EQ(cb6.getEvents()->size(), 4);
1814 CHECK_EQ(cb6.getEvents()->at(0).type,
1815 TestAcceptCallback::TYPE_START);
1816 CHECK_EQ(cb6.getEvents()->at(1).type,
1817 TestAcceptCallback::TYPE_ACCEPT);
1818 CHECK_EQ(cb6.getEvents()->at(2).type,
1819 TestAcceptCallback::TYPE_ACCEPT);
1820 CHECK_EQ(cb6.getEvents()->at(3).type,
1821 TestAcceptCallback::TYPE_STOP);
1823 CHECK_EQ(cb7.getEvents()->size(), 3);
1824 CHECK_EQ(cb7.getEvents()->at(0).type,
1825 TestAcceptCallback::TYPE_START);
1826 CHECK_EQ(cb7.getEvents()->at(1).type,
1827 TestAcceptCallback::TYPE_ACCEPT);
1828 CHECK_EQ(cb7.getEvents()->at(2).type,
1829 TestAcceptCallback::TYPE_STOP);
1833 * Test AsyncServerSocket::removeAcceptCallback()
1835 TEST(AsyncSocketTest, OtherThreadAcceptCallback) {
1836 // Create a new AsyncServerSocket
1837 EventBase eventBase;
1838 std::shared_ptr<AsyncServerSocket> serverSocket(
1839 AsyncServerSocket::newSocket(&eventBase));
1840 serverSocket->bind(0);
1841 serverSocket->listen(16);
1842 folly::SocketAddress serverAddress;
1843 serverSocket->getAddress(&serverAddress);
1845 // Add several accept callbacks
1846 TestAcceptCallback cb1;
1847 auto thread_id = pthread_self();
1848 cb1.setAcceptStartedFn([&](){
1849 CHECK_NE(thread_id, pthread_self());
1850 thread_id = pthread_self();
1852 cb1.setConnectionAcceptedFn([&](int fd, const folly::SocketAddress& addr){
1853 CHECK_EQ(thread_id, pthread_self());
1854 serverSocket->removeAcceptCallback(&cb1, nullptr);
1856 cb1.setAcceptStoppedFn([&](){
1857 CHECK_EQ(thread_id, pthread_self());
1860 // Test having callbacks remove other callbacks before them on the list,
1861 serverSocket->addAcceptCallback(&cb1, nullptr);
1862 serverSocket->startAccepting();
1864 // Make several connections to the socket
1865 std::shared_ptr<AsyncSocket> sock1(
1866 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb1
1868 // Loop in another thread
1869 auto other = std::thread([&](){
1874 // Check to make sure that the expected callbacks were invoked.
1876 // NOTE: This code depends on the AsyncServerSocket operating calling all of
1877 // the AcceptCallbacks in round-robin fashion, in the order that they were
1878 // added. The code is implemented this way right now, but the API doesn't
1879 // explicitly require it be done this way. If we change the code not to be
1880 // exactly round robin in the future, we can simplify the test checks here.
1881 // (We'll also need to update the termination code, since we expect cb6 to
1882 // get called twice to terminate the loop.)
1883 CHECK_EQ(cb1.getEvents()->size(), 3);
1884 CHECK_EQ(cb1.getEvents()->at(0).type,
1885 TestAcceptCallback::TYPE_START);
1886 CHECK_EQ(cb1.getEvents()->at(1).type,
1887 TestAcceptCallback::TYPE_ACCEPT);
1888 CHECK_EQ(cb1.getEvents()->at(2).type,
1889 TestAcceptCallback::TYPE_STOP);
1893 void serverSocketSanityTest(AsyncServerSocket* serverSocket) {
1894 // Add a callback to accept one connection then stop accepting
1895 TestAcceptCallback acceptCallback;
1896 acceptCallback.setConnectionAcceptedFn(
1897 [&](int fd, const folly::SocketAddress& addr) {
1898 serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
1900 acceptCallback.setAcceptErrorFn([&](const std::exception& ex) {
1901 serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
1903 serverSocket->addAcceptCallback(&acceptCallback, nullptr);
1904 serverSocket->startAccepting();
1906 // Connect to the server socket
1907 EventBase* eventBase = serverSocket->getEventBase();
1908 folly::SocketAddress serverAddress;
1909 serverSocket->getAddress(&serverAddress);
1910 AsyncSocket::UniquePtr socket(new AsyncSocket(eventBase, serverAddress));
1912 // Loop to process all events
1915 // Verify that the server accepted a connection
1916 CHECK_EQ(acceptCallback.getEvents()->size(), 3);
1917 CHECK_EQ(acceptCallback.getEvents()->at(0).type,
1918 TestAcceptCallback::TYPE_START);
1919 CHECK_EQ(acceptCallback.getEvents()->at(1).type,
1920 TestAcceptCallback::TYPE_ACCEPT);
1921 CHECK_EQ(acceptCallback.getEvents()->at(2).type,
1922 TestAcceptCallback::TYPE_STOP);
1925 /* Verify that we don't leak sockets if we are destroyed()
1926 * and there are still writes pending
1928 * If destroy() only calls close() instead of closeNow(),
1929 * it would shutdown(writes) on the socket, but it would
1930 * never be close()'d, and the socket would leak
1932 TEST(AsyncSocketTest, DestroyCloseTest) {
1938 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&clientEB);
1940 socket->connect(&ccb, server.getAddress(), 30);
1942 // Accept the connection
1943 std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&serverEB);
1945 acceptedSocket->setReadCB(&rcb);
1947 // Write a large buffer to the socket that is larger than kernel buffer
1948 size_t simpleBufLength = 5000000;
1949 char* simpleBuf = new char[simpleBufLength];
1950 memset(simpleBuf, 'a', simpleBufLength);
1953 // Let the reads and writes run to completion
1954 int fd = acceptedSocket->getFd();
1956 acceptedSocket->write(&wcb, simpleBuf, simpleBufLength);
1958 acceptedSocket.reset();
1960 // Test that server socket was closed
1961 ssize_t sz = read(fd, simpleBuf, simpleBufLength);
1968 * Test AsyncServerSocket::useExistingSocket()
1970 TEST(AsyncSocketTest, ServerExistingSocket) {
1971 EventBase eventBase;
1973 // Test creating a socket, and letting AsyncServerSocket bind and listen
1975 // Manually create a socket
1976 int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
1979 // Create a server socket
1980 AsyncServerSocket::UniquePtr serverSocket(
1981 new AsyncServerSocket(&eventBase));
1982 serverSocket->useExistingSocket(fd);
1983 folly::SocketAddress address;
1984 serverSocket->getAddress(&address);
1986 serverSocket->bind(address);
1987 serverSocket->listen(16);
1989 // Make sure the socket works
1990 serverSocketSanityTest(serverSocket.get());
1993 // Test creating a socket and binding manually,
1994 // then letting AsyncServerSocket listen
1996 // Manually create a socket
1997 int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
2000 struct sockaddr_in addr;
2001 addr.sin_family = AF_INET;
2003 addr.sin_addr.s_addr = INADDR_ANY;
2004 CHECK_EQ(bind(fd, reinterpret_cast<struct sockaddr*>(&addr),
2006 // Look up the address that we bound to
2007 folly::SocketAddress boundAddress;
2008 boundAddress.setFromLocalAddress(fd);
2010 // Create a server socket
2011 AsyncServerSocket::UniquePtr serverSocket(
2012 new AsyncServerSocket(&eventBase));
2013 serverSocket->useExistingSocket(fd);
2014 serverSocket->listen(16);
2016 // Make sure AsyncServerSocket reports the same address that we bound to
2017 folly::SocketAddress serverSocketAddress;
2018 serverSocket->getAddress(&serverSocketAddress);
2019 CHECK_EQ(boundAddress, serverSocketAddress);
2021 // Make sure the socket works
2022 serverSocketSanityTest(serverSocket.get());
2025 // Test creating a socket, binding and listening manually,
2026 // then giving it to AsyncServerSocket
2028 // Manually create a socket
2029 int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
2032 struct sockaddr_in addr;
2033 addr.sin_family = AF_INET;
2035 addr.sin_addr.s_addr = INADDR_ANY;
2036 CHECK_EQ(bind(fd, reinterpret_cast<struct sockaddr*>(&addr),
2038 // Look up the address that we bound to
2039 folly::SocketAddress boundAddress;
2040 boundAddress.setFromLocalAddress(fd);
2042 CHECK_EQ(listen(fd, 16), 0);
2044 // Create a server socket
2045 AsyncServerSocket::UniquePtr serverSocket(
2046 new AsyncServerSocket(&eventBase));
2047 serverSocket->useExistingSocket(fd);
2049 // Make sure AsyncServerSocket reports the same address that we bound to
2050 folly::SocketAddress serverSocketAddress;
2051 serverSocket->getAddress(&serverSocketAddress);
2052 CHECK_EQ(boundAddress, serverSocketAddress);
2054 // Make sure the socket works
2055 serverSocketSanityTest(serverSocket.get());
2059 TEST(AsyncSocketTest, UnixDomainSocketTest) {
2060 EventBase eventBase;
2062 // Create a server socket
2063 std::shared_ptr<AsyncServerSocket> serverSocket(
2064 AsyncServerSocket::newSocket(&eventBase));
2066 path.append("/anonymous");
2067 folly::SocketAddress serverAddress;
2068 serverAddress.setFromPath(path);
2069 serverSocket->bind(serverAddress);
2070 serverSocket->listen(16);
2072 // Add a callback to accept one connection then stop the loop
2073 TestAcceptCallback acceptCallback;
2074 acceptCallback.setConnectionAcceptedFn(
2075 [&](int fd, const folly::SocketAddress& addr) {
2076 serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
2078 acceptCallback.setAcceptErrorFn([&](const std::exception& ex) {
2079 serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
2081 serverSocket->addAcceptCallback(&acceptCallback, nullptr);
2082 serverSocket->startAccepting();
2084 // Connect to the server socket
2085 std::shared_ptr<AsyncSocket> socket(
2086 AsyncSocket::newSocket(&eventBase, serverAddress));
2090 // Verify that the server accepted a connection
2091 CHECK_EQ(acceptCallback.getEvents()->size(), 3);
2092 CHECK_EQ(acceptCallback.getEvents()->at(0).type,
2093 TestAcceptCallback::TYPE_START);
2094 CHECK_EQ(acceptCallback.getEvents()->at(1).type,
2095 TestAcceptCallback::TYPE_ACCEPT);
2096 CHECK_EQ(acceptCallback.getEvents()->at(2).type,
2097 TestAcceptCallback::TYPE_STOP);
2098 int fd = acceptCallback.getEvents()->at(1).fd;
2100 // The accepted connection should already be in non-blocking mode
2101 int flags = fcntl(fd, F_GETFL, 0);
2102 CHECK_EQ(flags & O_NONBLOCK, O_NONBLOCK);