} // namespace
std::pair<std::string, std::string> Subprocess::communicate(
- const CommunicateFlags& flags,
- StringPiece data) {
- IOBufQueue dataQueue;
- dataQueue.wrapBuffer(data.data(), data.size());
+ StringPiece input) {
+ IOBufQueue inputQueue;
+ inputQueue.wrapBuffer(input.data(), input.size());
- auto outQueues = communicateIOBuf(flags, std::move(dataQueue));
+ auto outQueues = communicateIOBuf(std::move(inputQueue));
auto outBufs = std::make_pair(outQueues.first.move(),
outQueues.second.move());
std::pair<std::string, std::string> out;
}
std::pair<IOBufQueue, IOBufQueue> Subprocess::communicateIOBuf(
- const CommunicateFlags& flags,
- IOBufQueue data) {
+ IOBufQueue input) {
+ // If the user supplied a non-empty input buffer, make sure
+ // that stdin is a pipe so we can write the data.
+ if (!input.empty()) {
+ // findByChildFd() will throw std::invalid_argument if no pipe for
+ // STDIN_FILENO exists
+ findByChildFd(STDIN_FILENO);
+ }
+
std::pair<IOBufQueue, IOBufQueue> out;
auto readCallback = [&] (int pfd, int cfd) -> bool {
- if (cfd == 1 && flags.readStdout_) {
+ if (cfd == STDOUT_FILENO) {
return handleRead(pfd, out.first);
- } else if (cfd == 2 && flags.readStderr_) {
+ } else if (cfd == STDERR_FILENO) {
return handleRead(pfd, out.second);
} else {
// Don't close the file descriptor, the child might not like SIGPIPE,
};
auto writeCallback = [&] (int pfd, int cfd) -> bool {
- if (cfd == 0 && flags.writeStdin_) {
- return handleWrite(pfd, data);
+ if (cfd == STDIN_FILENO) {
+ return handleWrite(pfd, input);
} else {
// If we don't want to write to this fd, just close it.
- return false;
+ return true;
}
};
/**
* Shortcut to change the action for standard input.
*/
- Options& stdin(int action) { return fd(0, action); }
+ Options& stdin(int action) { return fd(STDIN_FILENO, action); }
/**
* Shortcut to change the action for standard output.
*/
- Options& stdout(int action) { return fd(1, action); }
+ Options& stdout(int action) { return fd(STDOUT_FILENO, action); }
/**
* Shortcut to change the action for standard error.
* Note that stderr(1) will redirect the standard error to the same
* file descriptor as standard output; the equivalent of bash's "2>&1"
*/
- Options& stderr(int action) { return fd(2, action); }
+ Options& stderr(int action) { return fd(STDERR_FILENO, action); }
+
+ Options& pipeStdin() { return fd(STDIN_FILENO, PIPE_IN); }
+ Options& pipeStdout() { return fd(STDOUT_FILENO, PIPE_OUT); }
+ Options& pipeStderr() { return fd(STDERR_FILENO, PIPE_OUT); }
/**
* Close all other fds (other than standard input, output, error,
const std::vector<std::string>* env = nullptr);
/**
- * Append all data, close the stdin (to-child) fd, and read all data,
- * except that this is done in a safe manner to prevent deadlocking.
+ * Communicate with the child until all pipes to/from the child are closed.
*
- * If writeStdin() is given in flags, the process must have been opened with
- * stdinFd=PIPE.
+ * The input buffer is written to the process' stdin pipe, and data is read
+ * from the stdout and stderr pipes. Non-blocking I/O is performed on all
+ * pipes simultaneously to avoid deadlocks.
*
- * If readStdout() is given in flags, the first returned value will be the
- * value read from the child's stdout; the child must have been opened with
- * stdoutFd=PIPE.
+ * The stdin pipe will be closed after the full input buffer has been written.
+ * An error will be thrown if a non-empty input buffer is supplied but stdin
+ * was not configured as a pipe.
*
- * If readStderr() is given in flags, the second returned value will be the
- * value read from the child's stderr; the child must have been opened with
- * stderrFd=PIPE.
+ * Returns a pair of buffers containing the data read from stdout and stderr.
+ * If stdout or stderr is not a pipe, an empty IOBuf queue will be returned
+ * for the respective buffer.
*
* Note that communicate() returns when all pipes to/from the child are
* closed; the child might stay alive after that, so you must still wait().
* that it won't try to allocate all data at once). communicate
* uses strings for simplicity.
*/
- class CommunicateFlags : private boost::orable<CommunicateFlags> {
- friend class Subprocess;
- public:
- CommunicateFlags()
- : writeStdin_(false), readStdout_(false), readStderr_(false) { }
- CommunicateFlags& writeStdin() { writeStdin_ = true; return *this; }
- CommunicateFlags& readStdout() { readStdout_ = true; return *this; }
- CommunicateFlags& readStderr() { readStderr_ = true; return *this; }
-
- CommunicateFlags& operator|=(const CommunicateFlags& other);
- private:
- bool writeStdin_;
- bool readStdout_;
- bool readStderr_;
- };
-
- static CommunicateFlags writeStdin() {
- return CommunicateFlags().writeStdin();
- }
- static CommunicateFlags readStdout() {
- return CommunicateFlags().readStdout();
- }
- static CommunicateFlags readStderr() {
- return CommunicateFlags().readStderr();
- }
-
std::pair<IOBufQueue, IOBufQueue> communicateIOBuf(
- const CommunicateFlags& flags = readStdout(),
- IOBufQueue data = IOBufQueue());
+ IOBufQueue input = IOBufQueue());
std::pair<std::string, std::string> communicate(
- const CommunicateFlags& flags = readStdout(),
- StringPiece data = StringPiece());
+ StringPiece input = StringPiece());
/**
* Communicate with the child until all pipes to/from the child are closed.
return *this;
}
-inline Subprocess::CommunicateFlags& Subprocess::CommunicateFlags::operator|=(
- const Subprocess::CommunicateFlags& other) {
- if (this == &other) return *this;
- writeStdin_ |= other.writeStdin_;
- readStdout_ |= other.readStdout_;
- readStderr_ |= other.readStderr_;
- return *this;
-}
-
} // namespace folly
#endif /* FOLLY_SUBPROCESS_H_ */
return chainLength_;
}
+ /**
+ * Returns true iff the IOBuf chain length is 0.
+ */
+ bool empty() const {
+ return !head_ || head_->empty();
+ }
+
const Options& options() const {
return options_;
}
#include "folly/Exception.h"
#include "folly/Format.h"
+#include "folly/String.h"
#include "folly/experimental/Gen.h"
#include "folly/experimental/FileGen.h"
#include "folly/experimental/StringGen.h"
checkFdLeak([] {
Subprocess proc("echo foo; echo bar >&2",
Subprocess::pipeStdout() | Subprocess::pipeStderr());
- auto p = proc.communicate(Subprocess::readStdout() |
- Subprocess::readStderr());
+ auto p = proc.communicate();
EXPECT_EQ("foo\n", p.first);
EXPECT_EQ("bar\n", p.second);
proc.waitChecked();
}
Subprocess proc("wc -l", Subprocess::pipeStdin() | Subprocess::pipeStdout());
- auto p = proc.communicate(Subprocess::writeStdin() | Subprocess::readStdout(),
- data);
+ auto p = proc.communicate(data);
EXPECT_EQ(folly::format("{}\n", numLines).str(), p.first);
proc.waitChecked();
}
Subprocess proc("tr a-z A-Z",
Subprocess::pipeStdin() | Subprocess::pipeStdout());
- auto p = proc.communicate(Subprocess::writeStdin() | Subprocess::readStdout(),
- line);
+ auto p = proc.communicate(line);
EXPECT_EQ(bytes, p.first.size());
EXPECT_EQ(std::string::npos, p.first.find_first_not_of('X'));
proc.waitChecked();
}
+
+TEST(CommunicateSubprocessTest, Duplex2) {
+ checkFdLeak([] {
+ // Pipe 200,000 lines through sed
+ const size_t numCopies = 100000;
+ auto iobuf = IOBuf::copyBuffer("this is a test\nanother line\n");
+ IOBufQueue input;
+ for (int n = 0; n < numCopies; ++n) {
+ input.append(iobuf->clone());
+ }
+
+ std::vector<std::string> cmd({
+ "sed", "-u",
+ "-e", "s/a test/a successful test/",
+ "-e", "/^another line/w/dev/stderr",
+ });
+ auto options = Subprocess::pipeStdin().pipeStdout().pipeStderr().usePath();
+ Subprocess proc(cmd, options);
+ auto out = proc.communicateIOBuf(std::move(input));
+ proc.waitChecked();
+
+ // Convert stdout and stderr to strings so we can call split() on them.
+ fbstring stdoutStr;
+ if (out.first.front()) {
+ stdoutStr = out.first.move()->moveToFbString();
+ }
+ fbstring stderrStr;
+ if (out.second.front()) {
+ stderrStr = out.second.move()->moveToFbString();
+ }
+
+ // stdout should be a copy of stdin, with "a test" replaced by
+ // "a successful test"
+ std::vector<StringPiece> stdoutLines;
+ split('\n', stdoutStr, stdoutLines);
+ EXPECT_EQ(numCopies * 2 + 1, stdoutLines.size());
+ // Strip off the trailing empty line
+ if (!stdoutLines.empty()) {
+ EXPECT_EQ("", stdoutLines.back());
+ stdoutLines.pop_back();
+ }
+ size_t linenum = 0;
+ for (const auto& line : stdoutLines) {
+ if ((linenum & 1) == 0) {
+ EXPECT_EQ("this is a successful test", line);
+ } else {
+ EXPECT_EQ("another line", line);
+ }
+ ++linenum;
+ }
+
+ // stderr should only contain the lines containing "another line"
+ std::vector<StringPiece> stderrLines;
+ split('\n', stderrStr, stderrLines);
+ EXPECT_EQ(numCopies + 1, stderrLines.size());
+ // Strip off the trailing empty line
+ if (!stderrLines.empty()) {
+ EXPECT_EQ("", stderrLines.back());
+ stderrLines.pop_back();
+ }
+ for (const auto& line : stderrLines) {
+ EXPECT_EQ("another line", line);
+ }
+ });
+}