From: Christopher Dykes Date: Tue, 9 Aug 2016 23:58:56 +0000 (-0700) Subject: Support read and write from blocking and non-blocking pipes and sockets X-Git-Tag: v2016.08.15.00~28 X-Git-Url: http://demsky.eecs.uci.edu/git/?a=commitdiff_plain;h=60136d876fd415a26519a95a026d02251e249254;p=folly.git Support read and write from blocking and non-blocking pipes and sockets Summary: Also switch `pipe` to return a socket pair instead, as libevent can't poll against a pipe on Windows. And lastly, fix the file descriptor for sockets leaking when close is called. Reviewed By: yfeldblum Differential Revision: D3691255 fbshipit-source-id: c684242d255ac5158a4c805d432d345dac1b3bd8 --- diff --git a/folly/portability/Unistd.cpp b/folly/portability/Unistd.cpp index 33a8f7a4..27fc7c77 100755 --- a/folly/portability/Unistd.cpp +++ b/folly/portability/Unistd.cpp @@ -54,10 +54,24 @@ int access(char const* fn, int am) { return _access(fn, am); } int chdir(const char* path) { return _chdir(path); } +// We aren't hooking into the internals of the CRT, nope, not at all. +extern "C" int __cdecl _free_osfhnd(int const fh); int close(int fh) { if (folly::portability::sockets::is_fh_socket(fh)) { SOCKET h = (SOCKET)_get_osfhandle(fh); - return closesocket(h); + + // If we were to just call _close on the descriptor, it would + // close the HANDLE, but it wouldn't free any of the resources + // associated to the SOCKET, and we can't call _close after + // calling closesocket, because closesocket has already closed + // the HANDLE, and _close would attempt to close the HANDLE + // again, resulting in a double free. + // Luckily though, there is a function in the internals of the + // CRT that is used to free only the file descriptor, so we + // can call that to avoid leaking the file descriptor itself. + auto c = closesocket(h); + _free_osfhnd(fh); + return c; } return _close(fh); } @@ -114,7 +128,11 @@ long lseek(int fh, long off, int orig) { return _lseek(fh, off, orig); } int rmdir(const char* path) { return _rmdir(path); } -int pipe(int* pth) { return _pipe(pth, 0, _O_BINARY); } +int pipe(int pth[2]) { + // We need to be able to listen to pipes with + // libevent, so they need to be actual sockets. + return socketpair(PF_UNIX, SOCK_STREAM, 0, pth); +} int pread(int fd, void* buf, size_t count, off_t offset) { return wrapPositional(_read, fd, offset, buf, (unsigned int)count); @@ -124,7 +142,26 @@ int pwrite(int fd, const void* buf, size_t count, off_t offset) { return wrapPositional(_write, fd, offset, buf, (unsigned int)count); } -int read(int fh, void* buf, unsigned int mcc) { return _read(fh, buf, mcc); } +int read(int fh, void* buf, unsigned int mcc) { + if (folly::portability::sockets::is_fh_socket(fh)) { + SOCKET s = (SOCKET)_get_osfhandle(fh); + if (s != INVALID_SOCKET) { + auto r = folly::portability::sockets::recv(fh, buf, (size_t)mcc, 0); + if (r == -1 && WSAGetLastError() == WSAEWOULDBLOCK) { + errno = EAGAIN; + } + return r; + } + } + auto r = _read(fh, buf, mcc); + if (r == -1 && GetLastError() == ERROR_NO_DATA) { + // This only happens if the file was non-blocking and + // no data was present. We have to translate the error + // to a form that the rest of the world is expecting. + errno = EAGAIN; + } + return r; +} ssize_t readlink(const char* path, char* buf, size_t buflen) { if (!buflen) { @@ -198,8 +235,36 @@ int usleep(unsigned int ms) { return 0; } -int write(int fh, void const* buf, unsigned int mcc) { - return _write(fh, buf, mcc); +int write(int fh, void const* buf, unsigned int count) { + if (folly::portability::sockets::is_fh_socket(fh)) { + SOCKET s = (SOCKET)_get_osfhandle(fh); + if (s != INVALID_SOCKET) { + auto r = folly::portability::sockets::send(fh, buf, (size_t)count, 0); + if (r == -1 && WSAGetLastError() == WSAEWOULDBLOCK) { + errno = EAGAIN; + } + return r; + } + } + auto r = _write(fh, buf, count); + if ((r > 0 && r != count) || (r == -1 && errno == ENOSPC)) { + // Writing to a pipe with a full buffer doesn't generate + // any error type, unless it caused us to write exactly 0 + // bytes, so we have to see if we have a pipe first. We + // don't touch the errno for anything else. + HANDLE h = (HANDLE)_get_osfhandle(fh); + if (GetFileType(h) == FILE_TYPE_PIPE) { + DWORD state = 0; + if (GetNamedPipeHandleState( + h, &state, nullptr, nullptr, nullptr, nullptr, 0)) { + if ((state & PIPE_NOWAIT) == PIPE_NOWAIT) { + errno = EAGAIN; + return -1; + } + } + } + } + return r; } } } diff --git a/folly/portability/Unistd.h b/folly/portability/Unistd.h index 717fe03d..4b0097e3 100755 --- a/folly/portability/Unistd.h +++ b/folly/portability/Unistd.h @@ -71,7 +71,7 @@ int lockf(int fd, int cmd, off_t len); long lseek(int fh, long off, int orig); int read(int fh, void* buf, unsigned int mcc); int rmdir(const char* path); -int pipe(int* pth); +int pipe(int pth[2]); int pread(int fd, void* buf, size_t count, off_t offset); int pwrite(int fd, const void* buf, size_t count, off_t offset); ssize_t readlink(const char* path, char* buf, size_t buflen);