Add ability to silence callbacks for Subprocess::communicate
authorTudor Bosman <tudorb@fb.com>
Tue, 1 Apr 2014 17:36:27 +0000 (10:36 -0700)
committerSara Golemon <sgolemon@fb.com>
Fri, 4 Apr 2014 02:54:15 +0000 (19:54 -0700)
Summary:
Subprocess::communicate callbacks are level-triggered, which makes writing
"chatty" communication protocols difficult -- you often want to silence the
write callback until you read the expected message. Add
Subprocess::enableNotifications() for this purpose.

Test Plan: test added

Reviewed By: lucian@fb.com

FB internal diff: D1251564

@override-unit-failures

folly/Subprocess.cpp
folly/Subprocess.h
folly/test/SubprocessTest.cpp

index 96049d65dc086f37e9018877d09fea9729e2160b..b5f3e1dba7ffe6e1afd09e3915e06c9e545761b1 100644 (file)
@@ -700,7 +700,15 @@ void Subprocess::communicate(FdCallback readCallback,
       pfd.fd = p.parentFd;
       // Yes, backwards, PIPE_IN / PIPE_OUT are defined from the
       // child's point of view.
-      pfd.events = (p.direction == PIPE_IN ?  POLLOUT : POLLIN);
+      if (!p.enabled) {
+        // Still keeping fd in watched set so we get notified of POLLHUP /
+        // POLLERR
+        pfd.events = 0;
+      } else if (p.direction == PIPE_IN) {
+        pfd.events = POLLOUT;
+      } else {
+        pfd.events = POLLIN;
+      }
       fds.push_back(pfd);
     }
 
@@ -747,6 +755,14 @@ void Subprocess::communicate(FdCallback readCallback,
   }
 }
 
+void Subprocess::enableNotifications(int childFd, bool enabled) {
+  pipes_[findByChildFd(childFd)].enabled = enabled;
+}
+
+bool Subprocess::notificationsEnabled(int childFd) const {
+  return pipes_[findByChildFd(childFd)].enabled;
+}
+
 int Subprocess::findByChildFd(int childFd) const {
   auto pos = std::lower_bound(
       pipes_.begin(), pipes_.end(), childFd,
index c0b69e460ad602d8822f2ceb2557b8d969e4f2bb..d3a9fb8ec5240b206845ca52b97ec5e378440249 100644 (file)
@@ -383,6 +383,18 @@ class Subprocess : private boost::noncopyable {
   typedef std::function<bool(int, int)> FdCallback;
   void communicate(FdCallback readCallback, FdCallback writeCallback);
 
+  /**
+   * Enable notifications (callbacks) for one pipe to/from child. By default,
+   * all are enabled. Useful for "chatty" communication -- you want to disable
+   * write callbacks until you receive the expected message.
+   */
+  void enableNotifications(int childFd, bool enabled);
+
+  /**
+   * Are notifications for one pipe to/from child enabled?
+   */
+  bool notificationsEnabled(int childFd) const;
+
   /**
    * Return the child's pid, or -1 if the child wasn't successfully spawned
    * or has already been wait()ed upon.
@@ -504,9 +516,11 @@ class Subprocess : private boost::noncopyable {
   // so we're happy with a vector here, even if it means linear erase.
   // sorted by childFd
   struct PipeInfo : private boost::totally_ordered<PipeInfo> {
-    int parentFd;
-    int childFd;
-    int direction;  // one of PIPE_IN / PIPE_OUT
+    int parentFd = -1;
+    int childFd = -1;
+    int direction = PIPE_IN;  // one of PIPE_IN / PIPE_OUT
+    bool enabled = true;
+
     bool operator<(const PipeInfo& other) const {
       return childFd < other.childFd;
     }
index f17f3a51da37dda83886694e0777058542bb0c50..9be77f2e6a030f4b3b3aee6452d3aa35107a4c9e 100644 (file)
@@ -26,6 +26,7 @@
 
 #include "folly/Exception.h"
 #include "folly/Format.h"
+#include "folly/FileUtil.h"
 #include "folly/String.h"
 #include "folly/gen/Base.h"
 #include "folly/gen/File.h"
@@ -292,3 +293,102 @@ TEST(CommunicateSubprocessTest, Duplex2) {
     }
   });
 }
+
+namespace {
+
+bool readToString(int fd, std::string& buf, size_t maxSize) {
+  size_t bytesRead = 0;
+
+  buf.resize(maxSize);
+  char* dest = &buf.front();
+  size_t remaining = maxSize;
+
+  ssize_t n = -1;
+  while (remaining) {
+    n = ::read(fd, dest, remaining);
+    if (n == -1) {
+      if (errno == EINTR) {
+        continue;
+      }
+      if (errno == EAGAIN) {
+        break;
+      }
+      PCHECK("read failed");
+    } else if (n == 0) {
+      break;
+    }
+    dest += n;
+    remaining -= n;
+  }
+
+  buf.resize(dest - buf.data());
+  return (n == 0);
+}
+
+}  // namespace
+
+TEST(CommunicateSubprocessTest, Chatty) {
+  checkFdLeak([] {
+    const int lineCount = 1000;
+
+    int wcount = 0;
+    int rcount = 0;
+
+    auto options = Subprocess::pipeStdin().pipeStdout().pipeStderr().usePath();
+    std::vector<std::string> cmd {
+      "sed",
+      "-u",
+      "-e",
+      "s/a test/a successful test/",
+    };
+
+    Subprocess proc(cmd, options);
+
+    auto writeCallback = [&] (int pfd, int cfd) -> bool {
+      EXPECT_EQ(0, cfd);  // child stdin
+      EXPECT_EQ(rcount, wcount);  // chatty, one read for every write
+
+      auto msg = folly::to<std::string>("a test ", wcount, "\n");
+
+      // Not entirely kosher, we should handle partial writes, but this is
+      // fine for writes <= PIPE_BUF
+      EXPECT_EQ(msg.size(), writeFull(pfd, msg.data(), msg.size()));
+
+      ++wcount;
+      proc.enableNotifications(0, false);
+
+      return (wcount == lineCount);
+    };
+
+    auto readCallback = [&] (int pfd, int cfd) -> bool {
+      EXPECT_EQ(1, cfd);  // child stdout
+      EXPECT_EQ(wcount, rcount + 1);
+
+      auto expected =
+        folly::to<std::string>("a successful test ", rcount, "\n");
+
+      std::string lineBuf;
+
+      // Not entirely kosher, we should handle partial reads, but this is
+      // fine for reads <= PIPE_BUF
+      bool r = readToString(pfd, lineBuf, expected.size() + 1);
+
+      EXPECT_EQ((rcount == lineCount), r);  // EOF iff at lineCount
+      EXPECT_EQ(expected, lineBuf);
+
+      ++rcount;
+      if (rcount != lineCount) {
+        proc.enableNotifications(0, true);
+      }
+
+      return (rcount == lineCount);
+    };
+
+    proc.communicate(readCallback, writeCallback);
+
+    EXPECT_EQ(lineCount, wcount);
+    EXPECT_EQ(lineCount, rcount);
+
+    EXPECT_EQ(0, proc.wait().exitStatus());
+  });
+}