/*
- * Copyright 2013 Facebook, Inc.
+ * Copyright 2014 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* // read from proc.stdout()
* proc.wait();
*
- * A thread-safe version of popen() (type="w", to write from the child):
+ * A thread-safe version of popen() (type="w", to write to the child):
* Subprocess proc(cmd, Subprocess::pipeStdin());
* // write to proc.stdin()
* proc.wait();
#include <sys/types.h>
#include <signal.h>
+#if __APPLE__
+#include <sys/wait.h>
+#else
#include <wait.h>
+#endif
#include <exception>
#include <vector>
#include <boost/operators.hpp>
#include <boost/noncopyable.hpp>
-#include "folly/io/IOBufQueue.h"
-#include "folly/MapUtil.h"
-#include "folly/Portability.h"
-#include "folly/Range.h"
+#include <folly/FileUtil.h>
+#include <folly/gen/String.h>
+#include <folly/io/IOBufQueue.h>
+#include <folly/MapUtil.h>
+#include <folly/Portability.h>
+#include <folly/Range.h>
namespace folly {
public:
Options()
: closeOtherFds_(false),
- usePath_(false),
- parentDeathSignal_(0) {
+ usePath_(false) {
}
/**
/**
* Shortcut to change the action for standard input.
*/
- Options& stdin(int action) { return fd(0, action); }
+ Options& stdin(int action) { return fd(STDIN_FILENO, action); }
/**
* Shortcut to change the action for standard output.
*/
- Options& stdout(int action) { return fd(1, action); }
+ Options& stdout(int action) { return fd(STDOUT_FILENO, action); }
/**
* Shortcut to change the action for standard error.
* Note that stderr(1) will redirect the standard error to the same
* file descriptor as standard output; the equivalent of bash's "2>&1"
*/
- Options& stderr(int action) { return fd(2, action); }
+ Options& stderr(int action) { return fd(STDERR_FILENO, action); }
+
+ Options& pipeStdin() { return fd(STDIN_FILENO, PIPE_IN); }
+ Options& pipeStdout() { return fd(STDOUT_FILENO, PIPE_OUT); }
+ Options& pipeStderr() { return fd(STDERR_FILENO, PIPE_OUT); }
/**
* Close all other fds (other than standard input, output, error,
*/
Options& usePath() { usePath_ = true; return *this; }
+ /**
+ * Change the child's working directory, after the vfork.
+ */
+ Options& chdir(const std::string& dir) { childDir_ = dir; return *this; }
+
+#if __linux__
/**
* Child will receive a signal when the parent exits.
*/
parentDeathSignal_ = sig;
return *this;
}
+#endif
/**
* Helpful way to combine Options.
FdMap fdActions_;
bool closeOtherFds_;
bool usePath_;
- int parentDeathSignal_;
+ std::string childDir_; // "" keeps the parent's working directory
+#if __linux__
+ int parentDeathSignal_{0};
+#endif
};
static Options pipeStdin() { return Options().stdin(PIPE); }
const std::vector<std::string>* env = nullptr);
/**
- * Append all data, close the stdin (to-child) fd, and read all data,
- * except that this is done in a safe manner to prevent deadlocking.
+ * Communicate with the child until all pipes to/from the child are closed.
*
- * If writeStdin() is given in flags, the process must have been opened with
- * stdinFd=PIPE.
+ * The input buffer is written to the process' stdin pipe, and data is read
+ * from the stdout and stderr pipes. Non-blocking I/O is performed on all
+ * pipes simultaneously to avoid deadlocks.
*
- * If readStdout() is given in flags, the first returned value will be the
- * value read from the child's stdout; the child must have been opened with
- * stdoutFd=PIPE.
+ * The stdin pipe will be closed after the full input buffer has been written.
+ * An error will be thrown if a non-empty input buffer is supplied but stdin
+ * was not configured as a pipe.
*
- * If readStderr() is given in flags, the second returned value will be the
- * value read from the child's stderr; the child must have been opened with
- * stderrFd=PIPE.
+ * Returns a pair of buffers containing the data read from stdout and stderr.
+ * If stdout or stderr is not a pipe, an empty IOBuf queue will be returned
+ * for the respective buffer.
*
- * Note that communicate() returns when all pipes to/from the child are
- * closed; the child might stay alive after that, so you must still wait().
+ * Note that communicate() and communicateIOBuf() both return when all
+ * pipes to/from the child are closed; the child might stay alive after
+ * that, so you must still wait().
+ *
+ * communicateIOBuf() uses IOBufQueue for buffering (which has the
+ * advantage that it won't try to allocate all data at once), but it does
+ * store the subprocess's entire output in memory before returning.
*
- * communicateIOBuf uses IOBufQueue for buffering (which has the advantage
- * that it won't try to allocate all data at once). communicate
- * uses strings for simplicity.
+ * communicate() uses strings for simplicity.
*/
- class CommunicateFlags : private boost::orable<CommunicateFlags> {
- friend class Subprocess;
- public:
- CommunicateFlags()
- : writeStdin_(false), readStdout_(false), readStderr_(false) { }
- CommunicateFlags& writeStdin() { writeStdin_ = true; return *this; }
- CommunicateFlags& readStdout() { readStdout_ = true; return *this; }
- CommunicateFlags& readStderr() { readStderr_ = true; return *this; }
-
- CommunicateFlags& operator|=(const CommunicateFlags& other);
- private:
- bool writeStdin_;
- bool readStdout_;
- bool readStderr_;
- };
-
- static CommunicateFlags writeStdin() {
- return CommunicateFlags().writeStdin();
- }
- static CommunicateFlags readStdout() {
- return CommunicateFlags().readStdout();
- }
- static CommunicateFlags readStderr() {
- return CommunicateFlags().readStderr();
- }
-
std::pair<IOBufQueue, IOBufQueue> communicateIOBuf(
- const CommunicateFlags& flags = readStdout(),
- IOBufQueue data = IOBufQueue());
+ IOBufQueue input = IOBufQueue());
std::pair<std::string, std::string> communicate(
- const CommunicateFlags& flags = readStdout(),
- StringPiece data = StringPiece());
+ StringPiece input = StringPiece());
/**
* Communicate with the child until all pipes to/from the child are closed.
* identifying the stream; 0 = child's standard input, etc)
*
* The read and write callbacks must read from / write to pfd and return
- * false during normal operation or true at end-of-file;
- * communicate() will then close the pipe. Note that pfd is
- * nonblocking, so be prepared for read() / write() to return -1 and
- * set errno to EAGAIN (in which case you should return false).
+ * false during normal operation. Return true to tell communicate() to
+ * close the pipe. For readCallback, this might send SIGPIPE to the
+ * child, or make its writes fail with EPIPE, so you should generally
+ * avoid returning true unless you've reached end-of-file.
*
* NOTE that you MUST consume all data passed to readCallback (or return
- * true, which will close the pipe, possibly sending SIGPIPE to the child or
- * making its writes fail with EPIPE), and you MUST write to a writable pipe
- * (or return true, which will close the pipe). To do otherwise is an
- * error. You must do this even for pipes you are not interested in.
+ * true to close the pipe). Similarly, you MUST write to a writable pipe
+ * (or return true to close the pipe). To do otherwise is an error that
+ * can result in a deadlock. You must do this even for pipes you are not
+ * interested in.
+ *
+ * Note that pfd is nonblocking, so be prepared for read() / write() to
+ * return -1 and set errno to EAGAIN (in which case you should return
+ * false). Use readNoInt() from FileUtil.h to handle interrupted reads
+ * for you
*
* Note that communicate() returns when all pipes to/from the child are
* closed; the child might stay alive after that, so you must still wait().
*
* Most users won't need to use this; the simpler version of communicate
* (which buffers data in memory) will probably work fine.
+ *
+ * See ReadLinesCallback for an easy way to consume the child's output
+ * streams line-by-line (or tokenized by another delimiter).
*/
typedef std::function<bool(int, int)> FdCallback;
void communicate(FdCallback readCallback, FdCallback writeCallback);
+ /**
+ * A readCallback for Subprocess::communicate() that helps you consume
+ * lines (or other delimited pieces) from your subprocess's file
+ * descriptors. Use the readLinesCallback() helper to get template
+ * deduction. For example:
+ *
+ * auto read_cb = Subprocess::readLinesCallback(
+ * [](int fd, folly::StringPiece s) {
+ * std::cout << fd << " said: " << s;
+ * return false; // Keep reading from the child
+ * }
+ * );
+ * subprocess.communicate(
+ * // ReadLinesCallback contains StreamSplitter contains IOBuf, making
+ * // it noncopyable, whereas std::function must be copyable. So, we
+ * // keep the callback in a local, and instead pass a reference.
+ * std::ref(read_cb),
+ * [](int pdf, int cfd){ return true; } // Don't write to the child
+ * );
+ *
+ * If a file line exceeds maxLineLength, your callback will get some
+ * initial chunks of maxLineLength with no trailing delimiters. The final
+ * chunk of a line is delimiter-terminated iff the delimiter was present
+ * in the input. In particular, the last line in a file always lacks a
+ * delimiter -- so if a file ends on a delimiter, the final line is empty.
+ *
+ * Like a regular communicate() callback, your fdLineCb() normally returns
+ * false. It may return true to tell Subprocess to close the underlying
+ * file descriptor. The child process may then receive SIGPIPE or get
+ * EPIPE errors on writes.
+ */
+ template <class Callback>
+ class ReadLinesCallback {
+ private:
+ // Binds an FD to the client-provided FD+line callback
+ struct StreamSplitterCallback {
+ StreamSplitterCallback(Callback& cb, int fd) : cb_(cb), fd_(fd) { }
+ // The return value semantics are inverted vs StreamSplitter
+ bool operator()(StringPiece s) { return !cb_(fd_, s); }
+ Callback& cb_;
+ int fd_;
+ };
+ typedef gen::StreamSplitter<StreamSplitterCallback> LineSplitter;
+ public:
+ explicit ReadLinesCallback(
+ Callback&& fdLineCb,
+ uint64_t maxLineLength = 0, // No line length limit by default
+ char delimiter = '\n',
+ uint64_t bufSize = 1024
+ ) : fdLineCb_(std::move(fdLineCb)),
+ maxLineLength_(maxLineLength),
+ delimiter_(delimiter),
+ bufSize_(bufSize) {}
+
+ bool operator()(int pfd, int cfd) {
+ // Make a splitter for this cfd if it doesn't already exist
+ auto it = fdToSplitter_.find(cfd);
+ auto& splitter = (it != fdToSplitter_.end()) ? it->second
+ : fdToSplitter_.emplace(cfd, LineSplitter(
+ delimiter_, StreamSplitterCallback(fdLineCb_, cfd), maxLineLength_
+ )).first->second;
+ // Read as much as we can from this FD
+ char buf[bufSize_];
+ while (true) {
+ ssize_t ret = readNoInt(pfd, buf, bufSize_);
+ if (ret == -1 && errno == EAGAIN) { // No more data for now
+ return false;
+ }
+ if (ret == 0) { // Reached end-of-file
+ splitter.flush(); // Ignore return since the file is over anyway
+ return true;
+ }
+ if (!splitter(StringPiece(buf, ret))) {
+ return true; // The callback told us to stop
+ }
+ }
+ }
+
+ private:
+ Callback fdLineCb_;
+ const uint64_t maxLineLength_;
+ const char delimiter_;
+ const uint64_t bufSize_;
+ // We lazily make splitters for all cfds that get used.
+ std::unordered_map<int, LineSplitter> fdToSplitter_;
+ };
+
+ // Helper to enable template deduction
+ template <class Callback>
+ static ReadLinesCallback<Callback> readLinesCallback(
+ Callback&& fdLineCb,
+ uint64_t maxLineLength = 0, // No line length limit by default
+ char delimiter = '\n',
+ uint64_t bufSize = 1024) {
+ return ReadLinesCallback<Callback>(
+ std::move(fdLineCb), maxLineLength, delimiter, bufSize
+ );
+ }
+
+ /**
+ * Enable notifications (callbacks) for one pipe to/from child. By default,
+ * all are enabled. Useful for "chatty" communication -- you want to disable
+ * write callbacks until you receive the expected message.
+ */
+ void enableNotifications(int childFd, bool enabled);
+
+ /**
+ * Are notifications for one pipe to/from child enabled?
+ */
+ bool notificationsEnabled(int childFd) const;
+
/**
* Return the child's pid, or -1 if the child wasn't successfully spawned
* or has already been wait()ed upon.
// Actions to run in child.
// Note that this runs after vfork(), so tread lightly.
// Returns 0 on success, or an errno value on failure.
- int prepareChild(const Options& options, const sigset_t* sigmask) const;
+ int prepareChild(const Options& options,
+ const sigset_t* sigmask,
+ const char* childDir) const;
int runChild(const char* executable, char** argv, char** env,
const Options& options) const;
// so we're happy with a vector here, even if it means linear erase.
// sorted by childFd
struct PipeInfo : private boost::totally_ordered<PipeInfo> {
- int parentFd;
- int childFd;
- int direction; // one of PIPE_IN / PIPE_OUT
+ int parentFd = -1;
+ int childFd = -1;
+ int direction = PIPE_IN; // one of PIPE_IN / PIPE_OUT
+ bool enabled = true;
+
bool operator<(const PipeInfo& other) const {
return childFd < other.childFd;
}
return *this;
}
-inline Subprocess::CommunicateFlags& Subprocess::CommunicateFlags::operator|=(
- const Subprocess::CommunicateFlags& other) {
- if (this == &other) return *this;
- writeStdin_ |= other.writeStdin_;
- readStdout_ |= other.readStdout_;
- readStderr_ |= other.readStderr_;
- return *this;
-}
-
} // namespace folly
#endif /* FOLLY_SUBPROCESS_H_ */