Properly support socketpair and reading and writing non-blocking sockets
authorChristopher Dykes <cdykes@fb.com>
Tue, 9 Aug 2016 23:44:11 +0000 (16:44 -0700)
committerFacebook Github Bot 8 <facebook-github-bot-8-bot@fb.com>
Tue, 9 Aug 2016 23:53:30 +0000 (16:53 -0700)
Summary:
Also handle invalid file descriptors correctly and switch away from `WSASendMsg` for the implementation of `sendmsg` due to limitations imposed by WinSock.
This also fixes up a few places that were explicitly referencing the global version of some socket functions, which was bypassing the socket portability layer.

Reviewed By: yfeldblum

Differential Revision: D3692358

fbshipit-source-id: 05f394dcc9bac0591df7581345071ba2501b7695

folly/io/async/AsyncSocket.cpp
folly/io/async/test/AsyncSocketTest.h
folly/io/async/test/AsyncSocketTest2.cpp
folly/portability/Sockets.cpp
folly/portability/Sockets.h

index f8b42a74141d39a9b66ed3e39550f18dd9755850..ab988041a915dc8f1f47e10d1f1af15f7b52005d 100644 (file)
@@ -468,7 +468,7 @@ void AsyncSocket::connect(ConnectCallback* callback,
 }
 
 int AsyncSocket::socketConnect(const struct sockaddr* saddr, socklen_t len) {
-  int rv = ::connect(fd_, saddr, len);
+  int rv = fsp::connect(fd_, saddr, len);
   if (rv < 0) {
     auto errnoCopy = errno;
     if (errnoCopy == EINPROGRESS) {
index 1302e9c0c55fe83b2906589579d968b62d2bc349..d57080a90278de4f03068fe2b8949d5c34c7d094 100644 (file)
@@ -246,6 +246,7 @@ class TestServer {
   }
 
   int acceptFD(int timeout=50) {
+    namespace fsp = folly::portability::sockets;
     struct pollfd pfd;
     pfd.fd = fd_;
     pfd.events = POLLIN;
@@ -261,7 +262,7 @@ class TestServer {
           errno);
     }
 
-    int acceptedFd = ::accept(fd_, nullptr, nullptr);
+    int acceptedFd = fsp::accept(fd_, nullptr, nullptr);
     if (acceptedFd < 0) {
       throw folly::AsyncSocketException(
           folly::AsyncSocketException::INTERNAL_ERROR,
index bd38472619a37d71d794e79cf629c4f5d4a846b3..7ba0a4f89490898d93d61f86933501a767261d0a 100644 (file)
@@ -698,7 +698,7 @@ TEST(AsyncSocketTest, ConnectReadWriteAndShutdownWrite) {
   acceptedSocket->flush();
   // Shutdown writes to the accepted socket.  This will cause it to see EOF
   // and uninstall the read callback.
-  ::shutdown(acceptedSocket->getSocketFD(), SHUT_WR);
+  shutdown(acceptedSocket->getSocketFD(), SHUT_WR);
 
   // Loop
   evb.loop();
@@ -791,7 +791,7 @@ TEST(AsyncSocketTest, ConnectReadWriteAndShutdownWriteNow) {
   acceptedSocket->flush();
   // Shutdown writes to the accepted socket.  This will cause it to see EOF
   // and uninstall the read callback.
-  ::shutdown(acceptedSocket->getSocketFD(), SHUT_WR);
+  shutdown(acceptedSocket->getSocketFD(), SHUT_WR);
 
   // Loop
   evb.loop();
index 928573d3bd3873c235990fb33b52427fb8cd67e3..61cf6d0f3840f9b22d77fa352edd42a3d9aaaf45 100755 (executable)
@@ -21,6 +21,8 @@
 #include <errno.h>
 #include <fcntl.h>
 
+#include <event2/util.h>
+
 #include <MSWSock.h>
 
 #include <folly/ScopeGuard.h>
@@ -48,6 +50,9 @@ bool is_fh_socket(int fh) {
 }
 
 SOCKET fd_to_socket(int fd) {
+  if (fd == -1) {
+    return INVALID_SOCKET;
+  }
   // We do this in a roundabout way to allow us to compile even if
   // we're doing a bit of trickery to ensure that things aren't
   // being implicitly converted to a SOCKET by temporarily
@@ -59,6 +64,9 @@ SOCKET fd_to_socket(int fd) {
 }
 
 int socket_to_fd(SOCKET s) {
+  if (s == INVALID_SOCKET) {
+    return -1;
+  }
   return _open_osfhandle((intptr_t)s, O_RDWR | O_BINARY);
 }
 
@@ -79,7 +87,11 @@ int bind(int s, const struct sockaddr* name, socklen_t namelen) {
 }
 
 int connect(int s, const struct sockaddr* name, socklen_t namelen) {
-  return wrapSocketFunction<int>(::connect, s, name, namelen);
+  auto r = wrapSocketFunction<int>(::connect, s, name, namelen);
+  if (r == -1 && errno == WSAEWOULDBLOCK) {
+    errno = EINPROGRESS;
+  }
+  return r;
 }
 
 int getpeername(int s, struct sockaddr* name, socklen_t* namelen) {
@@ -228,23 +240,23 @@ ssize_t sendmsg(int s, const struct msghdr* message, int fl) {
   if (message->msg_name != nullptr || message->msg_namelen != 0) {
     return (ssize_t)-1;
   }
-  WSAMSG msg;
-  msg.name = nullptr;
-  msg.namelen = 0;
-  msg.Control.buf = (CHAR*)message->msg_control;
-  msg.Control.len = (ULONG)message->msg_controllen;
-  msg.dwFlags = 0;
-  msg.dwBufferCount = (DWORD)message->msg_iovlen;
-  msg.lpBuffers = new WSABUF[message->msg_iovlen];
-  SCOPE_EXIT { delete[] msg.lpBuffers; };
+
+  // Unfortunately, WSASendMsg requires the socket to have been opened
+  // as either SOCK_DGRAM or SOCK_RAW, but sendmsg has no such requirement,
+  // so we have to implement it based on send instead :(
+  ssize_t bytesSent = 0;
   for (size_t i = 0; i < message->msg_iovlen; i++) {
-    msg.lpBuffers[i].buf = (CHAR*)message->msg_iov[i].iov_base;
-    msg.lpBuffers[i].len = (ULONG)message->msg_iov[i].iov_len;
+    auto r = ::send(
+        h,
+        (const char*)message->msg_iov[i].iov_base,
+        message->msg_iov[i].iov_len,
+        message->msg_flags);
+    if (r == -1) {
+      return -1;
+    }
+    bytesSent += r;
   }
-
-  DWORD bytesSent;
-  int res = WSASendMsg(h, &msg, 0, &bytesSent, nullptr, nullptr);
-  return res == 0 ? (ssize_t)bytesSent : -1;
+  return bytesSent;
 }
 
 ssize_t sendto(
@@ -309,8 +321,17 @@ int socket(int af, int type, int protocol) {
 }
 
 int socketpair(int domain, int type, int protocol, int sv[2]) {
-  // Stub this out for now, to get things compiling.
-  return -1;
+  if (domain != PF_UNIX || type != SOCK_STREAM || protocol != 0) {
+    return -1;
+  }
+  intptr_t pair[2];
+  auto r = evutil_socketpair(AF_INET, type, protocol, pair);
+  if (r == -1) {
+    return r;
+  }
+  sv[0] = _open_osfhandle(pair[0], O_RDWR | O_BINARY);
+  sv[1] = _open_osfhandle(pair[1], O_RDWR | O_BINARY);
+  return 0;
 }
 }
 }
index dcabc13f7610159e25ca186f2a43fe7449ce366c..83e1b92e5c24a712dc9d1ebcf22bf0b174b8c339 100755 (executable)
@@ -37,7 +37,7 @@ using sa_family_t = ADDRESS_FAMILY;
 
 // We don't actually support either of these flags
 // currently.
-#define MSG_DONTWAIT 1
+#define MSG_DONTWAIT 0
 #define MSG_EOR 0
 struct msghdr {
   void* msg_name;
@@ -61,6 +61,7 @@ struct sockaddr_un {
 // These are the same, but PF_LOCAL
 // isn't defined by WinSock.
 #define PF_LOCAL PF_UNIX
+#define SO_REUSEPORT SO_REUSEADDR
 
 // Someone thought it would be a good idea
 // to define a field via a macro...
@@ -71,6 +72,7 @@ namespace folly {
 namespace portability {
 namespace sockets {
 #ifndef _WIN32
+using ::accept;
 using ::bind;
 using ::connect;
 using ::getpeername;