pfd.fd = p.parentFd;
// Yes, backwards, PIPE_IN / PIPE_OUT are defined from the
// child's point of view.
- pfd.events = (p.direction == PIPE_IN ? POLLOUT : POLLIN);
+ if (!p.enabled) {
+ // Still keeping fd in watched set so we get notified of POLLHUP /
+ // POLLERR
+ pfd.events = 0;
+ } else if (p.direction == PIPE_IN) {
+ pfd.events = POLLOUT;
+ } else {
+ pfd.events = POLLIN;
+ }
fds.push_back(pfd);
}
}
}
+void Subprocess::enableNotifications(int childFd, bool enabled) {
+ pipes_[findByChildFd(childFd)].enabled = enabled;
+}
+
+bool Subprocess::notificationsEnabled(int childFd) const {
+ return pipes_[findByChildFd(childFd)].enabled;
+}
+
int Subprocess::findByChildFd(int childFd) const {
auto pos = std::lower_bound(
pipes_.begin(), pipes_.end(), childFd,
typedef std::function<bool(int, int)> FdCallback;
void communicate(FdCallback readCallback, FdCallback writeCallback);
+ /**
+ * Enable notifications (callbacks) for one pipe to/from child. By default,
+ * all are enabled. Useful for "chatty" communication -- you want to disable
+ * write callbacks until you receive the expected message.
+ */
+ void enableNotifications(int childFd, bool enabled);
+
+ /**
+ * Are notifications for one pipe to/from child enabled?
+ */
+ bool notificationsEnabled(int childFd) const;
+
/**
* Return the child's pid, or -1 if the child wasn't successfully spawned
* or has already been wait()ed upon.
// so we're happy with a vector here, even if it means linear erase.
// sorted by childFd
struct PipeInfo : private boost::totally_ordered<PipeInfo> {
- int parentFd;
- int childFd;
- int direction; // one of PIPE_IN / PIPE_OUT
+ int parentFd = -1;
+ int childFd = -1;
+ int direction = PIPE_IN; // one of PIPE_IN / PIPE_OUT
+ bool enabled = true;
+
bool operator<(const PipeInfo& other) const {
return childFd < other.childFd;
}
#include "folly/Exception.h"
#include "folly/Format.h"
+#include "folly/FileUtil.h"
#include "folly/String.h"
#include "folly/gen/Base.h"
#include "folly/gen/File.h"
}
});
}
+
+namespace {
+
+bool readToString(int fd, std::string& buf, size_t maxSize) {
+ size_t bytesRead = 0;
+
+ buf.resize(maxSize);
+ char* dest = &buf.front();
+ size_t remaining = maxSize;
+
+ ssize_t n = -1;
+ while (remaining) {
+ n = ::read(fd, dest, remaining);
+ if (n == -1) {
+ if (errno == EINTR) {
+ continue;
+ }
+ if (errno == EAGAIN) {
+ break;
+ }
+ PCHECK("read failed");
+ } else if (n == 0) {
+ break;
+ }
+ dest += n;
+ remaining -= n;
+ }
+
+ buf.resize(dest - buf.data());
+ return (n == 0);
+}
+
+} // namespace
+
+TEST(CommunicateSubprocessTest, Chatty) {
+ checkFdLeak([] {
+ const int lineCount = 1000;
+
+ int wcount = 0;
+ int rcount = 0;
+
+ auto options = Subprocess::pipeStdin().pipeStdout().pipeStderr().usePath();
+ std::vector<std::string> cmd {
+ "sed",
+ "-u",
+ "-e",
+ "s/a test/a successful test/",
+ };
+
+ Subprocess proc(cmd, options);
+
+ auto writeCallback = [&] (int pfd, int cfd) -> bool {
+ EXPECT_EQ(0, cfd); // child stdin
+ EXPECT_EQ(rcount, wcount); // chatty, one read for every write
+
+ auto msg = folly::to<std::string>("a test ", wcount, "\n");
+
+ // Not entirely kosher, we should handle partial writes, but this is
+ // fine for writes <= PIPE_BUF
+ EXPECT_EQ(msg.size(), writeFull(pfd, msg.data(), msg.size()));
+
+ ++wcount;
+ proc.enableNotifications(0, false);
+
+ return (wcount == lineCount);
+ };
+
+ auto readCallback = [&] (int pfd, int cfd) -> bool {
+ EXPECT_EQ(1, cfd); // child stdout
+ EXPECT_EQ(wcount, rcount + 1);
+
+ auto expected =
+ folly::to<std::string>("a successful test ", rcount, "\n");
+
+ std::string lineBuf;
+
+ // Not entirely kosher, we should handle partial reads, but this is
+ // fine for reads <= PIPE_BUF
+ bool r = readToString(pfd, lineBuf, expected.size() + 1);
+
+ EXPECT_EQ((rcount == lineCount), r); // EOF iff at lineCount
+ EXPECT_EQ(expected, lineBuf);
+
+ ++rcount;
+ if (rcount != lineCount) {
+ proc.enableNotifications(0, true);
+ }
+
+ return (rcount == lineCount);
+ };
+
+ proc.communicate(readCallback, writeCallback);
+
+ EXPECT_EQ(lineCount, wcount);
+ EXPECT_EQ(lineCount, rcount);
+
+ EXPECT_EQ(0, proc.wait().exitStatus());
+ });
+}