rework the Subprocess::communicate() API
authorAdam Simpkins <simpkins@fb.com>
Thu, 16 May 2013 02:03:40 +0000 (19:03 -0700)
committerJordan DeLong <jdelong@fb.com>
Wed, 26 Jun 2013 02:45:53 +0000 (19:45 -0700)
Summary:
This eliminates the CommunicateFlags argument to communicate().  It now
always reads from both stdout and stderr if they were set up a pipes.
If a non-empty input buffer was supplied, it always writes that to
stdin.

This mimics the communicate() behavior of python's subprocess.py module.
This also makes it impossible to have buffering deadlocks by forgetting
to call communicate() with readStderr().

Test Plan:
Ran the existing subprocess tests, and also added a more complicated
duplex test that requires communication on stdin, stdout, and stderr all
at the same time.

Also grepped for all existing users of Subprocess::communicate(), and
made sure they will work correctly with the new behavior.

Reviewed By: tudorb@fb.com

FB internal diff: D814405

folly/Subprocess.cpp
folly/Subprocess.h
folly/io/IOBufQueue.h
folly/test/SubprocessTest.cpp

index 11026c29d4f72dd3ad20db3d18a92f0355a03fa8..fb4c19a95ee9bb1e998b426449cb2f1709112751 100644 (file)
@@ -617,12 +617,11 @@ bool discardRead(int fd) {
 }  // namespace
 
 std::pair<std::string, std::string> Subprocess::communicate(
-    const CommunicateFlags& flags,
-    StringPiece data) {
-  IOBufQueue dataQueue;
-  dataQueue.wrapBuffer(data.data(), data.size());
+    StringPiece input) {
+  IOBufQueue inputQueue;
+  inputQueue.wrapBuffer(input.data(), input.size());
 
-  auto outQueues = communicateIOBuf(flags, std::move(dataQueue));
+  auto outQueues = communicateIOBuf(std::move(inputQueue));
   auto outBufs = std::make_pair(outQueues.first.move(),
                                 outQueues.second.move());
   std::pair<std::string, std::string> out;
@@ -640,14 +639,21 @@ std::pair<std::string, std::string> Subprocess::communicate(
 }
 
 std::pair<IOBufQueue, IOBufQueue> Subprocess::communicateIOBuf(
-    const CommunicateFlags& flags,
-    IOBufQueue data) {
+    IOBufQueue input) {
+  // If the user supplied a non-empty input buffer, make sure
+  // that stdin is a pipe so we can write the data.
+  if (!input.empty()) {
+    // findByChildFd() will throw std::invalid_argument if no pipe for
+    // STDIN_FILENO exists
+    findByChildFd(STDIN_FILENO);
+  }
+
   std::pair<IOBufQueue, IOBufQueue> out;
 
   auto readCallback = [&] (int pfd, int cfd) -> bool {
-    if (cfd == 1 && flags.readStdout_) {
+    if (cfd == STDOUT_FILENO) {
       return handleRead(pfd, out.first);
-    } else if (cfd == 2 && flags.readStderr_) {
+    } else if (cfd == STDERR_FILENO) {
       return handleRead(pfd, out.second);
     } else {
       // Don't close the file descriptor, the child might not like SIGPIPE,
@@ -657,11 +663,11 @@ std::pair<IOBufQueue, IOBufQueue> Subprocess::communicateIOBuf(
   };
 
   auto writeCallback = [&] (int pfd, int cfd) -> bool {
-    if (cfd == 0 && flags.writeStdin_) {
-      return handleWrite(pfd, data);
+    if (cfd == STDIN_FILENO) {
+      return handleWrite(pfd, input);
     } else {
       // If we don't want to write to this fd, just close it.
-      return false;
+      return true;
     }
   };
 
index 58c9c9c829c16c496e8fcf3a15fa807911d2940e..94cc408971495f79b6fb3d765b17bfc53b1f9520 100644 (file)
@@ -224,19 +224,23 @@ class Subprocess : private boost::noncopyable {
     /**
      * Shortcut to change the action for standard input.
      */
-    Options& stdin(int action) { return fd(0, action); }
+    Options& stdin(int action) { return fd(STDIN_FILENO, action); }
 
     /**
      * Shortcut to change the action for standard output.
      */
-    Options& stdout(int action) { return fd(1, action); }
+    Options& stdout(int action) { return fd(STDOUT_FILENO, action); }
 
     /**
      * Shortcut to change the action for standard error.
      * Note that stderr(1) will redirect the standard error to the same
      * file descriptor as standard output; the equivalent of bash's "2>&1"
      */
-    Options& stderr(int action) { return fd(2, action); }
+    Options& stderr(int action) { return fd(STDERR_FILENO, action); }
+
+    Options& pipeStdin() { return fd(STDIN_FILENO, PIPE_IN); }
+    Options& pipeStdout() { return fd(STDOUT_FILENO, PIPE_OUT); }
+    Options& pipeStderr() { return fd(STDERR_FILENO, PIPE_OUT); }
 
     /**
      * Close all other fds (other than standard input, output, error,
@@ -310,19 +314,19 @@ class Subprocess : private boost::noncopyable {
       const std::vector<std::string>* env = nullptr);
 
   /**
-   * Append all data, close the stdin (to-child) fd, and read all data,
-   * except that this is done in a safe manner to prevent deadlocking.
+   * Communicate with the child until all pipes to/from the child are closed.
    *
-   * If writeStdin() is given in flags, the process must have been opened with
-   * stdinFd=PIPE.
+   * The input buffer is written to the process' stdin pipe, and data is read
+   * from the stdout and stderr pipes.  Non-blocking I/O is performed on all
+   * pipes simultaneously to avoid deadlocks.
    *
-   * If readStdout() is given in flags, the first returned value will be the
-   * value read from the child's stdout; the child must have been opened with
-   * stdoutFd=PIPE.
+   * The stdin pipe will be closed after the full input buffer has been written.
+   * An error will be thrown if a non-empty input buffer is supplied but stdin
+   * was not configured as a pipe.
    *
-   * If readStderr() is given in flags, the second returned value will be the
-   * value read from the child's stderr; the child must have been opened with
-   * stderrFd=PIPE.
+   * Returns a pair of buffers containing the data read from stdout and stderr.
+   * If stdout or stderr is not a pipe, an empty IOBuf queue will be returned
+   * for the respective buffer.
    *
    * Note that communicate() returns when all pipes to/from the child are
    * closed; the child might stay alive after that, so you must still wait().
@@ -331,39 +335,11 @@ class Subprocess : private boost::noncopyable {
    * that it won't try to allocate all data at once).  communicate
    * uses strings for simplicity.
    */
-  class CommunicateFlags : private boost::orable<CommunicateFlags> {
-    friend class Subprocess;
-   public:
-    CommunicateFlags()
-      : writeStdin_(false), readStdout_(false), readStderr_(false) { }
-    CommunicateFlags& writeStdin() { writeStdin_ = true; return *this; }
-    CommunicateFlags& readStdout() { readStdout_ = true; return *this; }
-    CommunicateFlags& readStderr() { readStderr_ = true; return *this; }
-
-    CommunicateFlags& operator|=(const CommunicateFlags& other);
-   private:
-    bool writeStdin_;
-    bool readStdout_;
-    bool readStderr_;
-  };
-
-  static CommunicateFlags writeStdin() {
-    return CommunicateFlags().writeStdin();
-  }
-  static CommunicateFlags readStdout() {
-    return CommunicateFlags().readStdout();
-  }
-  static CommunicateFlags readStderr() {
-    return CommunicateFlags().readStderr();
-  }
-
   std::pair<IOBufQueue, IOBufQueue> communicateIOBuf(
-      const CommunicateFlags& flags = readStdout(),
-      IOBufQueue data = IOBufQueue());
+      IOBufQueue input = IOBufQueue());
 
   std::pair<std::string, std::string> communicate(
-      const CommunicateFlags& flags = readStdout(),
-      StringPiece data = StringPiece());
+      StringPiece input = StringPiece());
 
   /**
    * Communicate with the child until all pipes to/from the child are closed.
@@ -546,15 +522,6 @@ inline Subprocess::Options& Subprocess::Options::operator|=(
   return *this;
 }
 
-inline Subprocess::CommunicateFlags& Subprocess::CommunicateFlags::operator|=(
-    const Subprocess::CommunicateFlags& other) {
-  if (this == &other) return *this;
-  writeStdin_ |= other.writeStdin_;
-  readStdout_ |= other.readStdout_;
-  readStderr_ |= other.readStderr_;
-  return *this;
-}
-
 }  // namespace folly
 
 #endif /* FOLLY_SUBPROCESS_H_ */
index 2a63fbf91fe57bc2b96fd34a09daeae77ad0cf6b..71966f49cb1819f5cc332cf9b4e8a4253bfa4c27 100644 (file)
@@ -222,6 +222,13 @@ class IOBufQueue {
     return chainLength_;
   }
 
+  /**
+   * Returns true iff the IOBuf chain length is 0.
+   */
+  bool empty() const {
+    return !head_ || head_->empty();
+  }
+
   const Options& options() const {
     return options_;
   }
index 112e9aae1c45504ec24b9e12ef36d1b40841292e..769eec572c1ae48c275f59efbc8dfefbefc3ed84 100644 (file)
@@ -26,6 +26,7 @@
 
 #include "folly/Exception.h"
 #include "folly/Format.h"
+#include "folly/String.h"
 #include "folly/experimental/Gen.h"
 #include "folly/experimental/FileGen.h"
 #include "folly/experimental/StringGen.h"
@@ -126,8 +127,7 @@ TEST(SimpleSubprocessTest, FdLeakTest) {
   checkFdLeak([] {
     Subprocess proc("echo foo; echo bar >&2",
                     Subprocess::pipeStdout() | Subprocess::pipeStderr());
-    auto p = proc.communicate(Subprocess::readStdout() |
-                              Subprocess::readStderr());
+    auto p = proc.communicate();
     EXPECT_EQ("foo\n", p.first);
     EXPECT_EQ("bar\n", p.second);
     proc.waitChecked();
@@ -209,8 +209,7 @@ TEST(CommunicateSubprocessTest, BigWrite) {
   }
 
   Subprocess proc("wc -l", Subprocess::pipeStdin() | Subprocess::pipeStdout());
-  auto p = proc.communicate(Subprocess::writeStdin() | Subprocess::readStdout(),
-                            data);
+  auto p = proc.communicate(data);
   EXPECT_EQ(folly::format("{}\n", numLines).str(), p.first);
   proc.waitChecked();
 }
@@ -223,9 +222,73 @@ TEST(CommunicateSubprocessTest, Duplex) {
 
   Subprocess proc("tr a-z A-Z",
                   Subprocess::pipeStdin() | Subprocess::pipeStdout());
-  auto p = proc.communicate(Subprocess::writeStdin() | Subprocess::readStdout(),
-                            line);
+  auto p = proc.communicate(line);
   EXPECT_EQ(bytes, p.first.size());
   EXPECT_EQ(std::string::npos, p.first.find_first_not_of('X'));
   proc.waitChecked();
 }
+
+TEST(CommunicateSubprocessTest, Duplex2) {
+  checkFdLeak([] {
+    // Pipe 200,000 lines through sed
+    const size_t numCopies = 100000;
+    auto iobuf = IOBuf::copyBuffer("this is a test\nanother line\n");
+    IOBufQueue input;
+    for (int n = 0; n < numCopies; ++n) {
+      input.append(iobuf->clone());
+    }
+
+    std::vector<std::string> cmd({
+      "sed", "-u",
+      "-e", "s/a test/a successful test/",
+      "-e", "/^another line/w/dev/stderr",
+    });
+    auto options = Subprocess::pipeStdin().pipeStdout().pipeStderr().usePath();
+    Subprocess proc(cmd, options);
+    auto out = proc.communicateIOBuf(std::move(input));
+    proc.waitChecked();
+
+    // Convert stdout and stderr to strings so we can call split() on them.
+    fbstring stdoutStr;
+    if (out.first.front()) {
+      stdoutStr = out.first.move()->moveToFbString();
+    }
+    fbstring stderrStr;
+    if (out.second.front()) {
+      stderrStr = out.second.move()->moveToFbString();
+    }
+
+    // stdout should be a copy of stdin, with "a test" replaced by
+    // "a successful test"
+    std::vector<StringPiece> stdoutLines;
+    split('\n', stdoutStr, stdoutLines);
+    EXPECT_EQ(numCopies * 2 + 1, stdoutLines.size());
+    // Strip off the trailing empty line
+    if (!stdoutLines.empty()) {
+      EXPECT_EQ("", stdoutLines.back());
+      stdoutLines.pop_back();
+    }
+    size_t linenum = 0;
+    for (const auto& line : stdoutLines) {
+      if ((linenum & 1) == 0) {
+        EXPECT_EQ("this is a successful test", line);
+      } else {
+        EXPECT_EQ("another line", line);
+      }
+      ++linenum;
+    }
+
+    // stderr should only contain the lines containing "another line"
+    std::vector<StringPiece> stderrLines;
+    split('\n', stderrStr, stderrLines);
+    EXPECT_EQ(numCopies + 1, stderrLines.size());
+    // Strip off the trailing empty line
+    if (!stderrLines.empty()) {
+      EXPECT_EQ("", stderrLines.back());
+      stderrLines.pop_back();
+    }
+    for (const auto& line : stderrLines) {
+      EXPECT_EQ("another line", line);
+    }
+  });
+}