From 677bed09c795cc810b1a4f697b0b18d0e5e709f0 Mon Sep 17 00:00:00 2001 From: Tudor Bosman Date: Mon, 29 Oct 2012 15:37:49 -0700 Subject: [PATCH] Subprocess library, modeled after python's subprocess module Summary: Surprised we don't have one. The API is modeled after Python's subprocess module, http://docs.python.org/2/library/subprocess.html Inspired by https://www.facebook.com/groups/fbcode/permalink/445399858830192/, plus I needed this functionality now. Test Plan: test added Reviewed By: chip@fb.com FB internal diff: D614056 --- folly/Subprocess.cpp | 636 ++++++++++++++++++++++++++++++++++ folly/Subprocess.h | 458 ++++++++++++++++++++++++ folly/test/SubprocessTest.cpp | 113 ++++++ 3 files changed, 1207 insertions(+) create mode 100644 folly/Subprocess.cpp create mode 100644 folly/Subprocess.h create mode 100644 folly/test/SubprocessTest.cpp diff --git a/folly/Subprocess.cpp b/folly/Subprocess.cpp new file mode 100644 index 00000000..034165c3 --- /dev/null +++ b/folly/Subprocess.cpp @@ -0,0 +1,636 @@ +/* + * Copyright 2012 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "folly/Subprocess.h" + +#include +#include +#include +#include + +#include +#include +#include + +#include +#include + +#include + +#include "folly/Conv.h" +#include "folly/ScopeGuard.h" +#include "folly/String.h" +#include "folly/experimental/io/Cursor.h" + +extern char** environ; + +namespace folly { + +ProcessReturnCode::State ProcessReturnCode::state() const { + if (rawStatus_ == RV_NOT_STARTED) return NOT_STARTED; + if (rawStatus_ == RV_RUNNING) return RUNNING; + if (WIFEXITED(rawStatus_)) return EXITED; + if (WIFSIGNALED(rawStatus_)) return KILLED; + throw std::runtime_error(to( + "Invalid ProcessReturnCode: ", rawStatus_)); +} + +void ProcessReturnCode::enforce(State s) const { + if (state() != s) { + throw std::logic_error(to("Invalid state ", s)); + } +} + +int ProcessReturnCode::exitStatus() const { + enforce(EXITED); + return WEXITSTATUS(rawStatus_); +} + +int ProcessReturnCode::killSignal() const { + enforce(KILLED); + return WTERMSIG(rawStatus_); +} + +bool ProcessReturnCode::coreDumped() const { + enforce(KILLED); + return WCOREDUMP(rawStatus_); +} + +std::string ProcessReturnCode::str() const { + switch (state()) { + case NOT_STARTED: + return "not started"; + case RUNNING: + return "running"; + case EXITED: + return to("exited with status ", exitStatus()); + case KILLED: + return to("killed by signal ", killSignal(), + (coreDumped() ? " (core dumped)" : "")); + } + CHECK(false); // unreached +} + +CalledProcessError::CalledProcessError(ProcessReturnCode rc) + : returnCode_(rc), + what_(returnCode_.str()) { +} + +namespace { + +// Copy pointers to the given strings in a format suitable for posix_spawn +std::unique_ptr cloneStrings(const std::vector& s) { + std::unique_ptr d(new const char*[s.size() + 1]); + for (int i = 0; i < s.size(); i++) { + d[i] = s[i].c_str(); + } + d[s.size()] = nullptr; + return d; +} + +// Helper to throw std::system_error +void throwSystemError(int err, const char* msg) __attribute__((noreturn)); +void throwSystemError(int err, const char* msg) { + throw std::system_error(err, std::system_category(), msg); +} + +// Helper to throw std::system_error from errno +void throwSystemError(const char* msg) __attribute__((noreturn)); +void throwSystemError(const char* msg) { + throwSystemError(errno, msg); +} + +// Check a Posix return code (0 on success, error number on error), throw +// on error. +void checkPosixError(int err, const char* msg) { + if (err != 0) { + throwSystemError(err, msg); + } +} + +// Check a traditional Uinx return code (-1 and sets errno on error), throw +// on error. +void checkUnixError(ssize_t ret, const char* msg) { + if (ret == -1) { + throwSystemError(msg); + } +} + +// Check a wait() status, throw on non-successful +void checkStatus(ProcessReturnCode returnCode) { + if (returnCode.state() != ProcessReturnCode::EXITED || + returnCode.exitStatus() != 0) { + throw CalledProcessError(returnCode); + } +} + +} // namespace + +Subprocess::Options& Subprocess::Options::fd(int fd, int action) { + if (action == Subprocess::PIPE) { + if (fd == 0) { + action = Subprocess::PIPE_IN; + } else if (fd == 1 || fd == 2) { + action = Subprocess::PIPE_OUT; + } else { + throw std::invalid_argument( + to("Only fds 0, 1, 2 are valid for action=PIPE: ", fd)); + } + } + fdActions_[fd] = action; + return *this; +} + +Subprocess::Subprocess( + const std::vector& argv, + const Options& options, + const char* executable, + const std::vector* env) + : pid_(-1), + returnCode_(RV_NOT_STARTED) { + if (argv.empty()) { + throw std::invalid_argument("argv must not be empty"); + } + if (!executable) executable = argv[0].c_str(); + spawn(cloneStrings(argv), executable, options, env); +} + +Subprocess::Subprocess( + const std::string& cmd, + const Options& options, + const std::vector* env) + : pid_(-1), + returnCode_(RV_NOT_STARTED) { + if (options.usePath_) { + throw std::invalid_argument("usePath() not allowed when running in shell"); + } + const char* shell = getenv("SHELL"); + if (!shell) { + shell = "/bin/sh"; + } + + std::unique_ptr argv(new const char*[4]); + argv[0] = shell; + argv[1] = "-c"; + argv[2] = cmd.c_str(); + argv[3] = nullptr; + spawn(std::move(argv), shell, options, env); +} + +Subprocess::~Subprocess() { + if (returnCode_.state() == ProcessReturnCode::RUNNING) { + LOG(ERROR) << "Subprocess destroyed without reaping; killing child."; + try { + kill(); + wait(); + } catch (...) { + LOG(FATAL) << "Killing child failed, terminating: " + << exceptionStr(std::current_exception()); + } + } + try { + closeAll(); + } catch (...) { + LOG(FATAL) << "close failed, terminating: " + << exceptionStr(std::current_exception()); + } +} + +namespace { +void closeChecked(int fd) { + checkUnixError(::close(fd), "close"); +} +} // namespace + +void Subprocess::closeAll() { + for (auto& p : pipes_) { + closeChecked(p.parentFd); + } + pipes_.clear(); +} + +void Subprocess::setAllNonBlocking() { + for (auto& p : pipes_) { + int fd = p.parentFd; + int flags = ::fcntl(fd, F_GETFL); + checkUnixError(flags, "fcntl"); + int r = ::fcntl(fd, F_SETFL, flags | O_NONBLOCK); + checkUnixError(r, "fcntl"); + } +} + +void Subprocess::spawn( + std::unique_ptr argv, + const char* executable, + const Options& optionsIn, + const std::vector* env) { + if (optionsIn.usePath_ && env) { + throw std::invalid_argument( + "usePath() not allowed when overriding environment"); + } + + // Make a copy, we'll mutate options + Options options(optionsIn); + + // Parent work, pre-fork: create pipes + std::vector childFds; + for (auto& p : options.fdActions_) { + if (p.second == PIPE_IN || p.second == PIPE_OUT) { + int fds[2]; + int r = ::pipe(fds); + checkUnixError(r, "pipe"); + PipeInfo pinfo; + pinfo.direction = p.second; + int cfd; + if (p.second == PIPE_IN) { + // Child gets reading end + pinfo.parentFd = fds[1]; + cfd = fds[0]; + } else { + pinfo.parentFd = fds[0]; + cfd = fds[1]; + } + p.second = cfd; // ensure it gets dup2()ed + pinfo.childFd = p.first; + childFds.push_back(cfd); + pipes_.push_back(pinfo); + } + } + + // This should already be sorted, as options.fdActions_ is + DCHECK(std::is_sorted(pipes_.begin(), pipes_.end())); + + // Note that the const casts below are legit, per + // http://pubs.opengroup.org/onlinepubs/009695399/functions/exec.html + + char** argVec = const_cast(argv.get()); + + // Set up environment + std::unique_ptr envHolder; + char** envVec; + if (env) { + envHolder = cloneStrings(*env); + envVec = const_cast(envHolder.get()); + } else { + envVec = environ; + } + + pid_t pid = vfork(); + if (pid == 0) { + runChild(executable, argVec, envVec, options); + // This should never return, but there's nothing else we can do here. + abort(); + } + + // In parent + checkUnixError(pid, "vfork"); + pid_ = pid; + returnCode_ = ProcessReturnCode(RV_RUNNING); + + // Parent work, post-fork: close child's ends of pipes + for (int f : childFds) { + closeChecked(f); + } +} + +namespace { + +// Checked version of close() to use in the child: abort() on error +void childClose(int fd) { + int r = ::close(fd); + if (r == -1) abort(); +} + +// Checked version of dup2() to use in the child: abort() on error +void childDup2(int oldfd, int newfd) { + int r = ::dup2(oldfd, newfd); + if (r == -1) abort(); +} + +} // namespace + +void Subprocess::runChild(const char* executable, + char** argv, char** env, + const Options& options) const { + // Close parent's ends of all pipes + for (auto& p : pipes_) { + childClose(p.parentFd); + } + + // Close all fds that we're supposed to close. + // Note that we're ignoring errors here, in case some of these + // fds were set to close on exec. + for (auto& p : options.fdActions_) { + if (p.second == CLOSE) { + ::close(p.first); + } else { + childDup2(p.second, p.first); + } + } + + // If requested, close all other file descriptors. Don't close + // any fds in options.fdActions_, and don't touch stdin, stdout, stderr. + // Ignore errors. + if (options.closeOtherFds_) { + for (int fd = getdtablesize() - 1; fd >= 3; --fd) { + if (options.fdActions_.count(fd) == 0) { + ::close(fd); + } + } + } + + // Now, finally, exec. + int r; + if (options.usePath_) { + ::execvp(executable, argv); + } else { + ::execve(executable, argv, env); + } + + // If we're here, something's wrong. + abort(); +} + +ProcessReturnCode Subprocess::poll() { + returnCode_.enforce(ProcessReturnCode::RUNNING); + DCHECK_GT(pid_, 0); + int status; + pid_t found = ::waitpid(pid_, &status, WNOHANG); + checkUnixError(found, "waitpid"); + if (found != 0) { + returnCode_ = ProcessReturnCode(status); + pid_ = -1; + } + return returnCode_; +} + +bool Subprocess::pollChecked() { + if (poll().state() == ProcessReturnCode::RUNNING) { + return false; + } + checkStatus(returnCode_); + return true; +} + +ProcessReturnCode Subprocess::wait() { + returnCode_.enforce(ProcessReturnCode::RUNNING); + DCHECK_GT(pid_, 0); + int status; + pid_t found = ::waitpid(pid_, &status, 0); + checkUnixError(found, "waitpid"); + returnCode_ = ProcessReturnCode(status); + return returnCode_; +} + +void Subprocess::waitChecked() { + wait(); + checkStatus(returnCode_); +} + +void Subprocess::sendSignal(int signal) { + returnCode_.enforce(ProcessReturnCode::RUNNING); + int r = ::kill(pid_, signal); + checkUnixError(r, "kill"); +} + +namespace { +void setNonBlocking(int fd) { + int flags = ::fcntl(fd, F_GETFL); + checkUnixError(flags, "fcntl"); + int r = ::fcntl(fd, F_SETFL, flags | O_NONBLOCK); + checkUnixError(r, "fcntl"); +} + +std::pair queueFront(const IOBufQueue& queue) { + auto* p = queue.front(); + if (!p) return std::make_pair(nullptr, 0); + return io::Cursor(p).peek(); +} + +// fd write +bool handleWrite(int fd, IOBufQueue& queue) { + for (;;) { + auto p = queueFront(queue); + if (p.second == 0) { + return true; // EOF + } + + ssize_t n; + do { + n = ::write(fd, p.first, p.second); + } while (n == -1 && errno == EINTR); + if (n == -1 && errno == EAGAIN) { + return false; + } + checkUnixError(n, "write"); + queue.trimStart(n); + } +} + +// fd read +bool handleRead(int fd, IOBufQueue& queue) { + for (;;) { + auto p = queue.preallocate(100, 65000); + ssize_t n; + do { + n = ::read(fd, p.first, p.second); + } while (n == -1 && errno == EINTR); + if (n == -1 && errno == EAGAIN) { + return false; + } + checkUnixError(n, "read"); + if (n == 0) { + return true; + } + queue.postallocate(n); + } +} + +bool discardRead(int fd) { + static const size_t bufSize = 65000; + // Thread unsafe, but it doesn't matter. + static std::unique_ptr buf(new char[bufSize]); + + for (;;) { + ssize_t n; + do { + n = ::read(fd, buf.get(), bufSize); + } while (n == -1 && errno == EINTR); + if (n == -1 && errno == EAGAIN) { + return false; + } + checkUnixError(n, "read"); + if (n == 0) { + return true; + } + } +} + +} // namespace + +std::pair Subprocess::communicate( + int flags, + StringPiece data) { + IOBufQueue dataQueue; + dataQueue.wrapBuffer(data.data(), data.size()); + + auto outQueues = communicateIOBuf(flags, std::move(dataQueue)); + auto outBufs = std::make_pair(outQueues.first.move(), + outQueues.second.move()); + std::pair out; + if (outBufs.first) { + outBufs.first->coalesce(); + out.first.assign(reinterpret_cast(outBufs.first->data()), + outBufs.first->length()); + } + if (outBufs.second) { + outBufs.second->coalesce(); + out.second.assign(reinterpret_cast(outBufs.second->data()), + outBufs.second->length()); + } + return out; +} + +std::pair Subprocess::communicateIOBuf( + int flags, + IOBufQueue data) { + std::pair out; + + auto readCallback = [&, flags] (int pfd, int cfd) { + if (cfd == 1 && (flags & READ_STDOUT)) { + return handleRead(pfd, out.first); + } else if (cfd == 2 && (flags & READ_STDERR)) { + return handleRead(pfd, out.second); + } else { + // Don't close the file descriptor, the child might not like SIGPIPE, + // just read and throw the data away. + return discardRead(pfd); + } + }; + + auto writeCallback = [&, flags] (int pfd, int cfd) { + if (cfd == 0 && (flags & WRITE_STDIN)) { + return handleWrite(pfd, data); + } else { + // If we don't want to write to this fd, just close it. + return false; + } + }; + + communicate(std::move(readCallback), std::move(writeCallback)); + + return out; +} + +void Subprocess::communicate(FdCallback readCallback, + FdCallback writeCallback) { + returnCode_.enforce(ProcessReturnCode::RUNNING); + setAllNonBlocking(); + + std::vector fds; + fds.reserve(pipes_.size()); + std::vector toClose; + toClose.reserve(pipes_.size()); + + while (!pipes_.empty()) { + fds.clear(); + toClose.clear(); + + for (auto& p : pipes_) { + pollfd pfd; + pfd.fd = p.parentFd; + // Yes, backwards, PIPE_IN / PIPE_OUT are defined from the + // child's point of view. + pfd.events = (p.direction == PIPE_IN ? POLLOUT : POLLIN); + fds.push_back(pfd); + } + + int r; + do { + r = ::poll(fds.data(), fds.size(), -1); + } while (r == -1 && errno == EINTR); + checkUnixError(r, "poll"); + + for (int i = 0; i < pipes_.size(); ++i) { + auto& p = pipes_[i]; + DCHECK_EQ(fds[i].fd, p.parentFd); + short events = fds[i].revents; + + bool closed = false; + if (events & POLLOUT) { + DCHECK(!(events & POLLIN)); + if (writeCallback(p.parentFd, p.childFd)) { + toClose.push_back(i); + closed = true; + } + } + + if (events & POLLIN) { + DCHECK(!(events & POLLOUT)); + if (readCallback(p.parentFd, p.childFd)) { + toClose.push_back(i); + closed = true; + } + } + + if ((events & (POLLHUP | POLLERR)) && !closed) { + toClose.push_back(i); + closed = true; + } + } + + // Close the fds in reverse order so the indexes hold after erase() + for (int idx : boost::adaptors::reverse(toClose)) { + auto pos = pipes_.begin() + idx; + closeChecked(pos->parentFd); + pipes_.erase(pos); + } + } +} + +int Subprocess::findByChildFd(int childFd) const { + auto pos = std::lower_bound( + pipes_.begin(), pipes_.end(), childFd, + [] (const PipeInfo& info, int fd) { return info.childFd < fd; }); + if (pos == pipes_.end() || pos->childFd != childFd) { + throw std::invalid_argument(folly::to( + "child fd not found ", childFd)); + } + return pos - pipes_.begin(); +} + +void Subprocess::closeParentFd(int childFd) { + int idx = findByChildFd(childFd); + closeChecked(pipes_[idx].parentFd); + pipes_.erase(pipes_.begin() + idx); +} + +namespace { + +class Initializer { + public: + Initializer() { + // We like EPIPE, thanks. + ::signal(SIGPIPE, SIG_IGN); + } +}; + +Initializer initializer; + +} // namespace + +} // namespace folly + diff --git a/folly/Subprocess.h b/folly/Subprocess.h new file mode 100644 index 00000000..de35965a --- /dev/null +++ b/folly/Subprocess.h @@ -0,0 +1,458 @@ +/* + * Copyright 2012 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Subprocess library, modeled after Python's subprocess module + * (http://docs.python.org/2/library/subprocess.html) + * + * This library defines one class (Subprocess) which represents a child + * process. Subprocess has two constructors: one that takes a vector + * and executes the given executable without using the shell, and one + * that takes a string and executes the given command using the shell. + * Subprocess allows you to redirect the child's standard input, standard + * output, and standard error to/from child descriptors in the parent, + * or to create communication pipes between the child and the parent. + * + * The simplest example is a thread-safe version of the system() library + * function: + * Subprocess(cmd).wait(); + * which executes the command using the default shell and waits for it + * to complete, returning the exit status. + * + * A thread-safe version of popen() (type="r", to read from the child): + * Subprocess proc(cmd, Subprocess::Options().stdout(Subprocess::PIPE)); + * // read from proc.stdout() + * proc.wait(); + * + * A thread-safe version of popen() (type="w", to write from the child): + * Subprocess proc(cmd, Subprocess::Options().stdin(Subprocess::PIPE)); + * // write to proc.stdin() + * proc.wait(); + * + * If you want to redirect both stdin and stdout to pipes, you can, but + * note that you're subject to a variety of deadlocks. You'll want to use + * nonblocking I/O; look at the implementation of communicate() for an example. + * + * communicate() is a way to communicate to a child via its standard input, + * standard output, and standard error. It buffers everything in memory, + * so it's not great for large amounts of data (or long-running processes), + * but it insulates you from the deadlocks mentioned above. + */ +#ifndef FOLLY_SUBPROCESS_H_ +#define FOLLY_SUBPROCESS_H_ + +#include +#include +#include + +#include +#include +#include + +#include +#include +#include + +#include "folly/experimental/io/IOBufQueue.h" +#include "folly/MapUtil.h" +#include "folly/Portability.h" +#include "folly/Range.h" + +namespace folly { + +/** + * Class to wrap a process return code. + */ +class Subprocess; +class ProcessReturnCode { + friend class Subprocess; + public: + enum State { + NOT_STARTED, + RUNNING, + EXITED, + KILLED + }; + + /** + * Process state. One of: + * NOT_STARTED: process hasn't been started successfully + * RUNNING: process is currently running + * EXITED: process exited (successfully or not) + * KILLED: process was killed by a signal. + */ + State state() const; + + /** + * Helper wrappers around state(). + */ + bool notStarted() const { return state() == NOT_STARTED; } + bool running() const { return state() == RUNNING; } + bool exited() const { return state() == EXITED; } + bool killed() const { return state() == KILLED; } + + /** + * Exit status. Only valid if state() == EXITED; throws otherwise. + */ + int exitStatus() const; + + /** + * Signal that caused the process's termination. Only valid if + * state() == KILLED; throws otherwise. + */ + int killSignal() const; + + /** + * Was a core file generated? Only valid if state() == KILLED; throws + * otherwise. + */ + bool coreDumped() const; + + /** + * String representation; one of + * "not started" + * "running" + * "exited with status " + * "killed by signal " + * "killed by signal (core dumped)" + */ + std::string str() const; + + /** + * Helper function to enforce a precondition based on this. + * Throws std::logic_error if in an unexpected state. + */ + void enforce(State state) const; + private: + explicit ProcessReturnCode(int rv) : rawStatus_(rv) { } + static constexpr int RV_NOT_STARTED = -2; + static constexpr int RV_RUNNING = -1; + + int rawStatus_; +}; + +/** + * Exception thrown by *Checked methods of Subprocess. + */ +class CalledProcessError : public std::exception { + public: + explicit CalledProcessError(ProcessReturnCode rc); + ~CalledProcessError() throw() { } + const char* what() const throw() FOLLY_OVERRIDE { return what_.c_str(); } + ProcessReturnCode returnCode() const { return returnCode_; } + private: + ProcessReturnCode returnCode_; + std::string what_; +}; + +/** + * Subprocess. + */ +class Subprocess : private boost::noncopyable { + public: + static const int CLOSE = -1; + static const int PIPE = -2; + static const int PIPE_IN = -3; + static const int PIPE_OUT = -4; + + /** + * Class representing various options: file descriptor behavior, and + * whether to use $PATH for searching for the executable, + * + * By default, we don't use $PATH, file descriptors are closed if + * the close-on-exec flag is set (fcntl FD_CLOEXEC) and inherited + * otherwise. + */ + class Options { + friend class Subprocess; + public: + Options() : closeOtherFds_(false), usePath_(false) { } + + /** + * Change action for file descriptor fd. + * + * "action" may be another file descriptor number (dup2()ed before the + * child execs), or one of CLOSE, PIPE_IN, and PIPE_OUT. + * + * CLOSE: close the file descriptor in the child + * PIPE_IN: open a pipe *from* the child + * PIPE_OUT: open a pipe *to* the child + * + * PIPE is a shortcut; same as PIPE_IN for stdin (fd 0), same as + * PIPE_OUT for stdout (fd 1) or stderr (fd 2), and an error for + * other file descriptors. + */ + Options& fd(int fd, int action); + + /** + * Shortcut to change the action for standard input. + */ + Options& stdin(int action) { return fd(0, action); } + + /** + * Shortcut to change the action for standard output. + */ + Options& stdout(int action) { return fd(1, action); } + + /** + * Shortcut to change the action for standard error. + * Note that stderr(1) will redirect the standard error to the same + * file descriptor as standard output; the equivalent of bash's "2>&1" + */ + Options& stderr(int action) { return fd(2, action); } + + /** + * Close all other fds (other than standard input, output, error, + * and file descriptors explicitly specified with fd()). + * + * This is potentially slow; it's generally a better idea to + * set the close-on-exec flag on all file descriptors that shouldn't + * be inherited by the child. + * + * Even with this option set, standard input, output, and error are + * not closed; use stdin(CLOSE), stdout(CLOSE), stderr(CLOSE) if you + * desire this. + */ + Options& closeOtherFds() { closeOtherFds_ = true; return *this; } + + /** + * Use the search path ($PATH) when searching for the executable. + */ + Options& usePath() { usePath_ = true; return *this; } + private: + typedef boost::container::flat_map FdMap; + FdMap fdActions_; + bool closeOtherFds_; + bool usePath_; + }; + + /** + * Create a subprocess from the given arguments. argv[0] must be listed. + * If not-null, executable must be the actual executable + * being used (otherwise it's the same as argv[0]). + * + * If env is not-null, it must contain name=value strings to be used + * as the child's environment; otherwise, we inherit the environment + * from the parent. env must be null if options.usePath is set. + */ + explicit Subprocess( + const std::vector& argv, + const Options& options = Options(), + const char* executable = nullptr, + const std::vector* env = nullptr); + ~Subprocess(); + + /** + * Create a subprocess run as a shell command (as shell -c 'command') + * + * The shell to use is taken from the environment variable $SHELL, + * or /bin/sh if $SHELL is unset. + */ + explicit Subprocess( + const std::string& cmd, + const Options& options = Options(), + const std::vector* env = nullptr); + + /** + * Append all data, close the stdin (to-child) fd, and read all data, + * except that this is done in a safe manner to prevent deadlocking. + * + * If WRITE_STDIN is given in flags, the process must have been opened with + * stdinFd=PIPE. + * + * If READ_STDOUT is given in flags, the first returned value will be the + * value read from the child's stdout; the child must have been opened with + * stdoutFd=PIPE. + * + * If READ_STDERR is given in flags, the second returned value will be the + * value read from the child's stderr; the child must have been opened with + * stderrFd=PIPE. + * + * Note that communicate() returns when all pipes to/from the child are + * closed; the child might stay alive after that, so you must still wait(). + * + * communicateIOBuf uses IOBufQueue for buffering (which has the advantage + * that it won't try to allocate all data at once). communicate + * uses strings for simplicity. + */ + enum { + WRITE_STDIN = 1 << 0, + READ_STDOUT = 1 << 1, + READ_STDERR = 1 << 2, + }; + std::pair communicateIOBuf( + int flags = READ_STDOUT, + IOBufQueue data = IOBufQueue()); + + std::pair communicate( + int flags = READ_STDOUT, + StringPiece data = StringPiece()); + + /** + * Communicate with the child until all pipes to/from the child are closed. + * + * readCallback(pfd, cfd) will be called whenever there's data available + * on any pipe *from* the child (PIPE_OUT). pfd is the file descriptor + * in the parent (that you use to read from); cfd is the file descriptor + * in the child (used for identifying the stream; 1 = child's standard + * output, 2 = child's standard error, etc) + * + * writeCallback(pfd, cfd) will be called whenever a pipe *to* the child is + * writable (PIPE_IN). pfd is the file descriptor in the parent (that you + * use to write to); cfd is the file descriptor in the child (used for + * identifying the stream; 0 = child's standard input, etc) + * + * The read and write callbacks must read from / write to pfd and return + * false during normal operation or true at end-of-file; + * communicate() will then close the pipe. Note that pfd is + * nonblocking, so be prepared for read() / write() to return -1 and + * set errno to EAGAIN (in which case you should return false). + * + * NOTE that you MUST consume all data passed to readCallback (or return + * false, which will close the pipe, possibly sending SIGPIPE to the child or + * making its writes fail with EPIPE), and you MUST write to a writable pipe + * (or return false, which will close the pipe). To do otherwise is an + * error. You must do this even for pipes you are not interested in. + * + * Note that communicate() returns when all pipes to/from the child are + * closed; the child might stay alive after that, so you must still wait(). + * + * Most users won't need to use this; the simpler version of communicate + * (which buffers data in memory) will probably work fine. + */ + typedef std::function FdCallback; + void communicate(FdCallback readCallback, FdCallback writeCallback); + + /** + * Return the child's pid, or -1 if the child wasn't successfully spawned + * or has already been wait()ed upon. + */ + pid_t pid() const; + + static const int RV_RUNNING = ProcessReturnCode::RV_RUNNING; + static const int RV_NOT_STARTED = ProcessReturnCode::RV_NOT_STARTED; + + /** + * Return the child's status (as per wait()) if the process has already + * been waited on, -1 if the process is still running, or -2 if the process + * hasn't been successfully started. NOTE that this does not poll, but + * returns the status stored in the Subprocess object. + */ + ProcessReturnCode returnCode() const { return returnCode_; } + + /** + * Poll the child's status and return it, return -1 if the process + * is still running. NOTE that it is illegal to call poll again after + * poll indicated that the process has terminated, or to call poll on a + * process that hasn't been successfully started (the constructor threw an + * exception). + */ + ProcessReturnCode poll(); + + /** + * Poll the child's status. If the process is still running, return false. + * Otherwise, return true if the process exited with status 0 (success), + * or throw CalledProcessError if the process exited with a non-zero status. + */ + bool pollChecked(); + + /** + * Wait for the process to terminate and return its status. + * Similarly to poll, it is illegal to call poll after the process + * has already been reaped or if the process has not successfully started. + */ + ProcessReturnCode wait(); + + /** + * Wait for the process to terminate, throw if unsuccessful. + */ + void waitChecked(); + + /** + * Set all pipes from / to child non-blocking. communicate() does + * this for you. + */ + void setAllNonBlocking(); + + /** + * Get parent file descriptor corresponding to the given file descriptor + * in the child. Throws if childFd isn't a pipe (PIPE_IN / PIPE_OUT). + * Do not close() the return file descriptor; use closeParentFd, below. + */ + int parentFd(int childFd) const { + return pipes_[findByChildFd(childFd)].parentFd; + } + int stdin() const { return parentFd(0); } + int stdout() const { return parentFd(1); } + int stderr() const { return parentFd(2); } + + /** + * Close the parent file descriptor given a file descriptor in the child. + */ + void closeParentFd(int childFd); + + /** + * Send a signal to the child. Shortcuts for the commonly used Unix + * signals are below. + */ + void sendSignal(int signal); + void terminate() { sendSignal(SIGTERM); } + void kill() { sendSignal(SIGKILL); } + + private: + void spawn( + std::unique_ptr argv, + const char* executable, + const Options& options, + const std::vector* env); + + // Action to run in child. + // Note that this runs after vfork(), so tread lightly. + void runChild(const char* executable, char** argv, char** env, + const Options& options) const; + + /** + * Close all file descriptors. + */ + void closeAll(); + + // return index in pipes_ + int findByChildFd(int childFd) const; + + pid_t pid_; + ProcessReturnCode returnCode_; + + // The number of pipes between parent and child is assumed to be small, + // so we're happy with a vector here, even if it means linear erase. + // sorted by childFd + struct PipeInfo : private boost::totally_ordered { + int parentFd; + int childFd; + int direction; // one of PIPE_IN / PIPE_OUT + bool operator<(const PipeInfo& other) const { + return childFd < other.childFd; + } + bool operator==(const PipeInfo& other) const { + return childFd == other.childFd; + } + }; + std::vector pipes_; +}; + +} // namespace folly + +#endif /* FOLLY_SUBPROCESS_H_ */ + diff --git a/folly/test/SubprocessTest.cpp b/folly/test/SubprocessTest.cpp new file mode 100644 index 00000000..7f6b96dd --- /dev/null +++ b/folly/test/SubprocessTest.cpp @@ -0,0 +1,113 @@ +/* + * Copyright 2012 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "folly/Subprocess.h" + +#include +#include + +#include "folly/Format.h" +#include "folly/experimental/io/Stream.h" + +using namespace folly; + +TEST(SimpleSubprocessTest, ExitsSuccessfully) { + Subprocess proc(std::vector{ "/bin/true" }); + EXPECT_EQ(0, proc.wait().exitStatus()); +} + +TEST(SimpleSubprocessTest, ExitsSuccessfullyChecked) { + Subprocess proc(std::vector{ "/bin/true" }); + proc.waitChecked(); +} + +TEST(SimpleSubprocessTest, ExitsWithError) { + Subprocess proc(std::vector{ "/bin/false" }); + EXPECT_EQ(1, proc.wait().exitStatus()); +} + +TEST(SimpleSubprocessTest, ExitsWithErrorChecked) { + Subprocess proc(std::vector{ "/bin/false" }); + EXPECT_THROW(proc.waitChecked(), CalledProcessError); +} + +TEST(SimpleSubprocessTest, ShellExitsSuccesssfully) { + Subprocess proc("true"); + EXPECT_EQ(0, proc.wait().exitStatus()); +} + +TEST(SimpleSubprocessTest, ShellExitsWithError) { + Subprocess proc("false"); + EXPECT_EQ(1, proc.wait().exitStatus()); +} + +TEST(PopenSubprocessTest, PopenRead) { + Subprocess proc("ls /", Subprocess::Options().stdout(Subprocess::PIPE)); + int found = 0; + for (auto bline : byLine(proc.stdout())) { + StringPiece line(bline); + if (line == "etc" || line == "bin" || line == "usr") { + ++found; + } + } + EXPECT_EQ(3, found); + proc.waitChecked(); +} + +TEST(CommunicateSubprocessTest, SimpleRead) { + Subprocess proc(std::vector{ "/bin/echo", "-n", "foo", "bar"}, + Subprocess::Options().stdout(Subprocess::PIPE)); + auto p = proc.communicate(); + EXPECT_EQ("foo bar", p.first); + proc.waitChecked(); +} + +TEST(CommunicateSubprocessTest, BigWrite) { + const int numLines = 1 << 20; + std::string line("hello\n"); + std::string data; + data.reserve(numLines * line.size()); + for (int i = 0; i < numLines; ++i) { + data.append(line); + } + + Subprocess::Options options; + options.stdin(Subprocess::PIPE).stdout(Subprocess::PIPE); + + Subprocess proc("wc -l", options); + auto p = proc.communicate(Subprocess::WRITE_STDIN | Subprocess::READ_STDOUT, + data); + EXPECT_EQ(folly::format("{}\n", numLines).str(), p.first); + proc.waitChecked(); +} + +TEST(CommunicateSubprocessTest, Duplex) { + // Take 10MB of data and pass them through a filter. + // One line, as tr is line-buffered + const int bytes = 10 << 20; + std::string line(bytes, 'x'); + + Subprocess::Options options; + options.stdin(Subprocess::PIPE).stdout(Subprocess::PIPE); + + Subprocess proc("tr a-z A-Z", options); + auto p = proc.communicate(Subprocess::WRITE_STDIN | Subprocess::READ_STDOUT, + line); + EXPECT_EQ(bytes, p.first.size()); + EXPECT_EQ(std::string::npos, p.first.find_first_not_of('X')); + proc.waitChecked(); +} + -- 2.34.1