From 30c26b8fbc31bcb51a3c903a84ee54fb7d03f2c9 Mon Sep 17 00:00:00 2001 From: Tudor Bosman Date: Tue, 1 Apr 2014 10:36:27 -0700 Subject: [PATCH] Add ability to silence callbacks for Subprocess::communicate Summary: Subprocess::communicate callbacks are level-triggered, which makes writing "chatty" communication protocols difficult -- you often want to silence the write callback until you read the expected message. Add Subprocess::enableNotifications() for this purpose. Test Plan: test added Reviewed By: lucian@fb.com FB internal diff: D1251564 @override-unit-failures --- folly/Subprocess.cpp | 18 +++++- folly/Subprocess.h | 20 ++++++- folly/test/SubprocessTest.cpp | 100 ++++++++++++++++++++++++++++++++++ 3 files changed, 134 insertions(+), 4 deletions(-) diff --git a/folly/Subprocess.cpp b/folly/Subprocess.cpp index 96049d65..b5f3e1db 100644 --- a/folly/Subprocess.cpp +++ b/folly/Subprocess.cpp @@ -700,7 +700,15 @@ void Subprocess::communicate(FdCallback readCallback, pfd.fd = p.parentFd; // Yes, backwards, PIPE_IN / PIPE_OUT are defined from the // child's point of view. - pfd.events = (p.direction == PIPE_IN ? POLLOUT : POLLIN); + if (!p.enabled) { + // Still keeping fd in watched set so we get notified of POLLHUP / + // POLLERR + pfd.events = 0; + } else if (p.direction == PIPE_IN) { + pfd.events = POLLOUT; + } else { + pfd.events = POLLIN; + } fds.push_back(pfd); } @@ -747,6 +755,14 @@ void Subprocess::communicate(FdCallback readCallback, } } +void Subprocess::enableNotifications(int childFd, bool enabled) { + pipes_[findByChildFd(childFd)].enabled = enabled; +} + +bool Subprocess::notificationsEnabled(int childFd) const { + return pipes_[findByChildFd(childFd)].enabled; +} + int Subprocess::findByChildFd(int childFd) const { auto pos = std::lower_bound( pipes_.begin(), pipes_.end(), childFd, diff --git a/folly/Subprocess.h b/folly/Subprocess.h index c0b69e46..d3a9fb8e 100644 --- a/folly/Subprocess.h +++ b/folly/Subprocess.h @@ -383,6 +383,18 @@ class Subprocess : private boost::noncopyable { typedef std::function FdCallback; void communicate(FdCallback readCallback, FdCallback writeCallback); + /** + * Enable notifications (callbacks) for one pipe to/from child. By default, + * all are enabled. Useful for "chatty" communication -- you want to disable + * write callbacks until you receive the expected message. + */ + void enableNotifications(int childFd, bool enabled); + + /** + * Are notifications for one pipe to/from child enabled? + */ + bool notificationsEnabled(int childFd) const; + /** * Return the child's pid, or -1 if the child wasn't successfully spawned * or has already been wait()ed upon. @@ -504,9 +516,11 @@ class Subprocess : private boost::noncopyable { // so we're happy with a vector here, even if it means linear erase. // sorted by childFd struct PipeInfo : private boost::totally_ordered { - int parentFd; - int childFd; - int direction; // one of PIPE_IN / PIPE_OUT + int parentFd = -1; + int childFd = -1; + int direction = PIPE_IN; // one of PIPE_IN / PIPE_OUT + bool enabled = true; + bool operator<(const PipeInfo& other) const { return childFd < other.childFd; } diff --git a/folly/test/SubprocessTest.cpp b/folly/test/SubprocessTest.cpp index f17f3a51..9be77f2e 100644 --- a/folly/test/SubprocessTest.cpp +++ b/folly/test/SubprocessTest.cpp @@ -26,6 +26,7 @@ #include "folly/Exception.h" #include "folly/Format.h" +#include "folly/FileUtil.h" #include "folly/String.h" #include "folly/gen/Base.h" #include "folly/gen/File.h" @@ -292,3 +293,102 @@ TEST(CommunicateSubprocessTest, Duplex2) { } }); } + +namespace { + +bool readToString(int fd, std::string& buf, size_t maxSize) { + size_t bytesRead = 0; + + buf.resize(maxSize); + char* dest = &buf.front(); + size_t remaining = maxSize; + + ssize_t n = -1; + while (remaining) { + n = ::read(fd, dest, remaining); + if (n == -1) { + if (errno == EINTR) { + continue; + } + if (errno == EAGAIN) { + break; + } + PCHECK("read failed"); + } else if (n == 0) { + break; + } + dest += n; + remaining -= n; + } + + buf.resize(dest - buf.data()); + return (n == 0); +} + +} // namespace + +TEST(CommunicateSubprocessTest, Chatty) { + checkFdLeak([] { + const int lineCount = 1000; + + int wcount = 0; + int rcount = 0; + + auto options = Subprocess::pipeStdin().pipeStdout().pipeStderr().usePath(); + std::vector cmd { + "sed", + "-u", + "-e", + "s/a test/a successful test/", + }; + + Subprocess proc(cmd, options); + + auto writeCallback = [&] (int pfd, int cfd) -> bool { + EXPECT_EQ(0, cfd); // child stdin + EXPECT_EQ(rcount, wcount); // chatty, one read for every write + + auto msg = folly::to("a test ", wcount, "\n"); + + // Not entirely kosher, we should handle partial writes, but this is + // fine for writes <= PIPE_BUF + EXPECT_EQ(msg.size(), writeFull(pfd, msg.data(), msg.size())); + + ++wcount; + proc.enableNotifications(0, false); + + return (wcount == lineCount); + }; + + auto readCallback = [&] (int pfd, int cfd) -> bool { + EXPECT_EQ(1, cfd); // child stdout + EXPECT_EQ(wcount, rcount + 1); + + auto expected = + folly::to("a successful test ", rcount, "\n"); + + std::string lineBuf; + + // Not entirely kosher, we should handle partial reads, but this is + // fine for reads <= PIPE_BUF + bool r = readToString(pfd, lineBuf, expected.size() + 1); + + EXPECT_EQ((rcount == lineCount), r); // EOF iff at lineCount + EXPECT_EQ(expected, lineBuf); + + ++rcount; + if (rcount != lineCount) { + proc.enableNotifications(0, true); + } + + return (rcount == lineCount); + }; + + proc.communicate(readCallback, writeCallback); + + EXPECT_EQ(lineCount, wcount); + EXPECT_EQ(lineCount, rcount); + + EXPECT_EQ(0, proc.wait().exitStatus()); + }); +} -- 2.34.1