/*
- * Copyright 2012 Facebook, Inc.
+ * Copyright 2014 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* to complete, returning the exit status.
*
* A thread-safe version of popen() (type="r", to read from the child):
- * Subprocess proc(cmd, Subprocess::Options().stdout(Subprocess::PIPE));
+ * Subprocess proc(cmd, Subprocess::pipeStdout());
* // read from proc.stdout()
* proc.wait();
*
- * A thread-safe version of popen() (type="w", to write from the child):
- * Subprocess proc(cmd, Subprocess::Options().stdin(Subprocess::PIPE));
+ * A thread-safe version of popen() (type="w", to write to the child):
+ * Subprocess proc(cmd, Subprocess::pipeStdin());
* // write to proc.stdin()
* proc.wait();
*
#include <sys/types.h>
#include <signal.h>
+#if __APPLE__
+#include <sys/wait.h>
+#else
#include <wait.h>
+#endif
#include <exception>
#include <vector>
#include <boost/operators.hpp>
#include <boost/noncopyable.hpp>
-#include "folly/experimental/io/IOBufQueue.h"
-#include "folly/MapUtil.h"
-#include "folly/Portability.h"
-#include "folly/Range.h"
+#include <folly/FileUtil.h>
+#include <folly/gen/String.h>
+#include <folly/io/IOBufQueue.h>
+#include <folly/MapUtil.h>
+#include <folly/Portability.h>
+#include <folly/Range.h>
namespace folly {
int rawStatus_;
};
+/**
+ * Base exception thrown by the Subprocess methods.
+ */
+class SubprocessError : public std::exception {};
+
/**
* Exception thrown by *Checked methods of Subprocess.
*/
-class CalledProcessError : public std::exception {
+class CalledProcessError : public SubprocessError {
public:
explicit CalledProcessError(ProcessReturnCode rc);
~CalledProcessError() throw() { }
std::string what_;
};
+/**
+ * Exception thrown if the subprocess cannot be started.
+ */
+class SubprocessSpawnError : public SubprocessError {
+ public:
+ SubprocessSpawnError(const char* executable, int errCode, int errnoValue);
+ ~SubprocessSpawnError() throw() {}
+ const char* what() const throw() FOLLY_OVERRIDE { return what_.c_str(); }
+ int errnoValue() const { return errnoValue_; }
+
+ private:
+ int errnoValue_;
+ std::string what_;
+};
+
/**
* Subprocess.
*/
* the close-on-exec flag is set (fcntl FD_CLOEXEC) and inherited
* otherwise.
*/
- class Options {
+ class Options : private boost::orable<Options> {
friend class Subprocess;
public:
- Options() : closeOtherFds_(false), usePath_(false) { }
+ Options()
+ : closeOtherFds_(false),
+ usePath_(false) {
+ }
/**
* Change action for file descriptor fd.
/**
* Shortcut to change the action for standard input.
*/
- Options& stdin(int action) { return fd(0, action); }
+ Options& stdin(int action) { return fd(STDIN_FILENO, action); }
/**
* Shortcut to change the action for standard output.
*/
- Options& stdout(int action) { return fd(1, action); }
+ Options& stdout(int action) { return fd(STDOUT_FILENO, action); }
/**
* Shortcut to change the action for standard error.
* Note that stderr(1) will redirect the standard error to the same
* file descriptor as standard output; the equivalent of bash's "2>&1"
*/
- Options& stderr(int action) { return fd(2, action); }
+ Options& stderr(int action) { return fd(STDERR_FILENO, action); }
+
+ Options& pipeStdin() { return fd(STDIN_FILENO, PIPE_IN); }
+ Options& pipeStdout() { return fd(STDOUT_FILENO, PIPE_OUT); }
+ Options& pipeStderr() { return fd(STDERR_FILENO, PIPE_OUT); }
/**
* Close all other fds (other than standard input, output, error,
* Use the search path ($PATH) when searching for the executable.
*/
Options& usePath() { usePath_ = true; return *this; }
+
+ /**
+ * Change the child's working directory, after the vfork.
+ */
+ Options& chdir(const std::string& dir) { childDir_ = dir; return *this; }
+
+#if __linux__
+ /**
+ * Child will receive a signal when the parent exits.
+ */
+ Options& parentDeathSignal(int sig) {
+ parentDeathSignal_ = sig;
+ return *this;
+ }
+#endif
+
+ /**
+ * Helpful way to combine Options.
+ */
+ Options& operator|=(const Options& other);
+
private:
typedef boost::container::flat_map<int, int> FdMap;
FdMap fdActions_;
bool closeOtherFds_;
bool usePath_;
+ std::string childDir_; // "" keeps the parent's working directory
+#if __linux__
+ int parentDeathSignal_{0};
+#endif
};
+ static Options pipeStdin() { return Options().stdin(PIPE); }
+ static Options pipeStdout() { return Options().stdout(PIPE); }
+ static Options pipeStderr() { return Options().stderr(PIPE); }
+
/**
* Create a subprocess from the given arguments. argv[0] must be listed.
* If not-null, executable must be the actual executable
const std::vector<std::string>* env = nullptr);
/**
- * Append all data, close the stdin (to-child) fd, and read all data,
- * except that this is done in a safe manner to prevent deadlocking.
+ * Communicate with the child until all pipes to/from the child are closed.
*
- * If WRITE_STDIN is given in flags, the process must have been opened with
- * stdinFd=PIPE.
+ * The input buffer is written to the process' stdin pipe, and data is read
+ * from the stdout and stderr pipes. Non-blocking I/O is performed on all
+ * pipes simultaneously to avoid deadlocks.
*
- * If READ_STDOUT is given in flags, the first returned value will be the
- * value read from the child's stdout; the child must have been opened with
- * stdoutFd=PIPE.
+ * The stdin pipe will be closed after the full input buffer has been written.
+ * An error will be thrown if a non-empty input buffer is supplied but stdin
+ * was not configured as a pipe.
*
- * If READ_STDERR is given in flags, the second returned value will be the
- * value read from the child's stderr; the child must have been opened with
- * stderrFd=PIPE.
+ * Returns a pair of buffers containing the data read from stdout and stderr.
+ * If stdout or stderr is not a pipe, an empty IOBuf queue will be returned
+ * for the respective buffer.
*
- * Note that communicate() returns when all pipes to/from the child are
- * closed; the child might stay alive after that, so you must still wait().
+ * Note that communicate() and communicateIOBuf() both return when all
+ * pipes to/from the child are closed; the child might stay alive after
+ * that, so you must still wait().
+ *
+ * communicateIOBuf() uses IOBufQueue for buffering (which has the
+ * advantage that it won't try to allocate all data at once), but it does
+ * store the subprocess's entire output in memory before returning.
*
- * communicateIOBuf uses IOBufQueue for buffering (which has the advantage
- * that it won't try to allocate all data at once). communicate
- * uses strings for simplicity.
+ * communicate() uses strings for simplicity.
*/
- enum {
- WRITE_STDIN = 1 << 0,
- READ_STDOUT = 1 << 1,
- READ_STDERR = 1 << 2,
- };
std::pair<IOBufQueue, IOBufQueue> communicateIOBuf(
- int flags = READ_STDOUT,
- IOBufQueue data = IOBufQueue());
+ IOBufQueue input = IOBufQueue());
std::pair<std::string, std::string> communicate(
- int flags = READ_STDOUT,
- StringPiece data = StringPiece());
+ StringPiece input = StringPiece());
/**
* Communicate with the child until all pipes to/from the child are closed.
* identifying the stream; 0 = child's standard input, etc)
*
* The read and write callbacks must read from / write to pfd and return
- * false during normal operation or true at end-of-file;
- * communicate() will then close the pipe. Note that pfd is
- * nonblocking, so be prepared for read() / write() to return -1 and
- * set errno to EAGAIN (in which case you should return false).
+ * false during normal operation. Return true to tell communicate() to
+ * close the pipe. For readCallback, this might send SIGPIPE to the
+ * child, or make its writes fail with EPIPE, so you should generally
+ * avoid returning true unless you've reached end-of-file.
*
* NOTE that you MUST consume all data passed to readCallback (or return
- * false, which will close the pipe, possibly sending SIGPIPE to the child or
- * making its writes fail with EPIPE), and you MUST write to a writable pipe
- * (or return false, which will close the pipe). To do otherwise is an
- * error. You must do this even for pipes you are not interested in.
+ * true to close the pipe). Similarly, you MUST write to a writable pipe
+ * (or return true to close the pipe). To do otherwise is an error that
+ * can result in a deadlock. You must do this even for pipes you are not
+ * interested in.
+ *
+ * Note that pfd is nonblocking, so be prepared for read() / write() to
+ * return -1 and set errno to EAGAIN (in which case you should return
+ * false). Use readNoInt() from FileUtil.h to handle interrupted reads
+ * for you
*
* Note that communicate() returns when all pipes to/from the child are
* closed; the child might stay alive after that, so you must still wait().
*
* Most users won't need to use this; the simpler version of communicate
* (which buffers data in memory) will probably work fine.
+ *
+ * See ReadLinesCallback for an easy way to consume the child's output
+ * streams line-by-line (or tokenized by another delimiter).
*/
typedef std::function<bool(int, int)> FdCallback;
void communicate(FdCallback readCallback, FdCallback writeCallback);
+ /**
+ * A readCallback for Subprocess::communicate() that helps you consume
+ * lines (or other delimited pieces) from your subprocess's file
+ * descriptors. Use the readLinesCallback() helper to get template
+ * deduction. For example:
+ *
+ * auto read_cb = Subprocess::readLinesCallback(
+ * [](int fd, folly::StringPiece s) {
+ * std::cout << fd << " said: " << s;
+ * return false; // Keep reading from the child
+ * }
+ * );
+ * subprocess.communicate(
+ * // ReadLinesCallback contains StreamSplitter contains IOBuf, making
+ * // it noncopyable, whereas std::function must be copyable. So, we
+ * // keep the callback in a local, and instead pass a reference.
+ * std::ref(read_cb),
+ * [](int pdf, int cfd){ return true; } // Don't write to the child
+ * );
+ *
+ * If a file line exceeds maxLineLength, your callback will get some
+ * initial chunks of maxLineLength with no trailing delimiters. The final
+ * chunk of a line is delimiter-terminated iff the delimiter was present
+ * in the input. In particular, the last line in a file always lacks a
+ * delimiter -- so if a file ends on a delimiter, the final line is empty.
+ *
+ * Like a regular communicate() callback, your fdLineCb() normally returns
+ * false. It may return true to tell Subprocess to close the underlying
+ * file descriptor. The child process may then receive SIGPIPE or get
+ * EPIPE errors on writes.
+ */
+ template <class Callback>
+ class ReadLinesCallback {
+ private:
+ // Binds an FD to the client-provided FD+line callback
+ struct StreamSplitterCallback {
+ StreamSplitterCallback(Callback& cb, int fd) : cb_(cb), fd_(fd) { }
+ // The return value semantics are inverted vs StreamSplitter
+ bool operator()(StringPiece s) { return !cb_(fd_, s); }
+ Callback& cb_;
+ int fd_;
+ };
+ typedef gen::StreamSplitter<StreamSplitterCallback> LineSplitter;
+ public:
+ explicit ReadLinesCallback(
+ Callback&& fdLineCb,
+ uint64_t maxLineLength = 0, // No line length limit by default
+ char delimiter = '\n',
+ uint64_t bufSize = 1024
+ ) : fdLineCb_(std::move(fdLineCb)),
+ maxLineLength_(maxLineLength),
+ delimiter_(delimiter),
+ bufSize_(bufSize) {}
+
+ bool operator()(int pfd, int cfd) {
+ // Make a splitter for this cfd if it doesn't already exist
+ auto it = fdToSplitter_.find(cfd);
+ auto& splitter = (it != fdToSplitter_.end()) ? it->second
+ : fdToSplitter_.emplace(cfd, LineSplitter(
+ delimiter_, StreamSplitterCallback(fdLineCb_, cfd), maxLineLength_
+ )).first->second;
+ // Read as much as we can from this FD
+ char buf[bufSize_];
+ while (true) {
+ ssize_t ret = readNoInt(pfd, buf, bufSize_);
+ if (ret == -1 && errno == EAGAIN) { // No more data for now
+ return false;
+ }
+ if (ret == 0) { // Reached end-of-file
+ splitter.flush(); // Ignore return since the file is over anyway
+ return true;
+ }
+ if (!splitter(StringPiece(buf, ret))) {
+ return true; // The callback told us to stop
+ }
+ }
+ }
+
+ private:
+ Callback fdLineCb_;
+ const uint64_t maxLineLength_;
+ const char delimiter_;
+ const uint64_t bufSize_;
+ // We lazily make splitters for all cfds that get used.
+ std::unordered_map<int, LineSplitter> fdToSplitter_;
+ };
+
+ // Helper to enable template deduction
+ template <class Callback>
+ static ReadLinesCallback<Callback> readLinesCallback(
+ Callback&& fdLineCb,
+ uint64_t maxLineLength = 0, // No line length limit by default
+ char delimiter = '\n',
+ uint64_t bufSize = 1024) {
+ return ReadLinesCallback<Callback>(
+ std::move(fdLineCb), maxLineLength, delimiter, bufSize
+ );
+ }
+
+ /**
+ * Enable notifications (callbacks) for one pipe to/from child. By default,
+ * all are enabled. Useful for "chatty" communication -- you want to disable
+ * write callbacks until you receive the expected message.
+ */
+ void enableNotifications(int childFd, bool enabled);
+
+ /**
+ * Are notifications for one pipe to/from child enabled?
+ */
+ bool notificationsEnabled(int childFd) const;
+
/**
* Return the child's pid, or -1 if the child wasn't successfully spawned
* or has already been wait()ed upon.
*/
pid_t pid() const;
- static const int RV_RUNNING = ProcessReturnCode::RV_RUNNING;
- static const int RV_NOT_STARTED = ProcessReturnCode::RV_NOT_STARTED;
-
/**
* Return the child's status (as per wait()) if the process has already
* been waited on, -1 if the process is still running, or -2 if the process
/**
* Wait for the process to terminate and return its status.
- * Similarly to poll, it is illegal to call poll after the process
+ * Similarly to poll, it is illegal to call wait after the process
* has already been reaped or if the process has not successfully started.
*/
ProcessReturnCode wait();
void kill() { sendSignal(SIGKILL); }
private:
+ static const int RV_RUNNING = ProcessReturnCode::RV_RUNNING;
+ static const int RV_NOT_STARTED = ProcessReturnCode::RV_NOT_STARTED;
+
+ // spawn() sets up a pipe to read errors from the child,
+ // then calls spawnInternal() to do the bulk of the work. Once
+ // spawnInternal() returns it reads the error pipe to see if the child
+ // encountered any errors.
void spawn(
std::unique_ptr<const char*[]> argv,
const char* executable,
const Options& options,
const std::vector<std::string>* env);
+ void spawnInternal(
+ std::unique_ptr<const char*[]> argv,
+ const char* executable,
+ Options& options,
+ const std::vector<std::string>* env,
+ int errFd);
- // Action to run in child.
+ // Actions to run in child.
// Note that this runs after vfork(), so tread lightly.
- void runChild(const char* executable, char** argv, char** env,
- const Options& options) const;
+ // Returns 0 on success, or an errno value on failure.
+ int prepareChild(const Options& options,
+ const sigset_t* sigmask,
+ const char* childDir) const;
+ int runChild(const char* executable, char** argv, char** env,
+ const Options& options) const;
+
+ /**
+ * Read from the error pipe, and throw SubprocessSpawnError if the child
+ * failed before calling exec().
+ */
+ void readChildErrorPipe(int pfd, const char* executable);
/**
* Close all file descriptors.
// so we're happy with a vector here, even if it means linear erase.
// sorted by childFd
struct PipeInfo : private boost::totally_ordered<PipeInfo> {
- int parentFd;
- int childFd;
- int direction; // one of PIPE_IN / PIPE_OUT
+ int parentFd = -1;
+ int childFd = -1;
+ int direction = PIPE_IN; // one of PIPE_IN / PIPE_OUT
+ bool enabled = true;
+
bool operator<(const PipeInfo& other) const {
return childFd < other.childFd;
}
std::vector<PipeInfo> pipes_;
};
+inline Subprocess::Options& Subprocess::Options::operator|=(
+ const Subprocess::Options& other) {
+ if (this == &other) return *this;
+ // Replace
+ for (auto& p : other.fdActions_) {
+ fdActions_[p.first] = p.second;
+ }
+ closeOtherFds_ |= other.closeOtherFds_;
+ usePath_ |= other.usePath_;
+ return *this;
+}
+
} // namespace folly
#endif /* FOLLY_SUBPROCESS_H_ */