X-Git-Url: http://demsky.eecs.uci.edu/git/?a=blobdiff_plain;f=folly%2FSubprocess.h;h=311b75d2219d4a67cd914d2f51b06c2951e3379b;hb=a5709e63e29cd69e4de3e9f463747da9b644844c;hp=5acf6235aa0bfc585eb291f0d5f91999a2cde0d6;hpb=217e88e6a6f011a6cc4d44490b319f919574a620;p=folly.git diff --git a/folly/Subprocess.h b/folly/Subprocess.h index 5acf6235..311b75d2 100644 --- a/folly/Subprocess.h +++ b/folly/Subprocess.h @@ -70,6 +70,8 @@ #include #include +#include +#include #include #include #include @@ -203,10 +205,7 @@ class Subprocess : private boost::noncopyable { class Options : private boost::orable { friend class Subprocess; public: - Options() - : closeOtherFds_(false), - usePath_(false) { - } + Options() {} // E.g. https://gcc.gnu.org/bugzilla/show_bug.cgi?id=58328 /** * Change action for file descriptor fd. @@ -279,6 +278,16 @@ class Subprocess : private boost::noncopyable { } #endif + /** + * Child will be made a process group leader when it starts. Upside: one + * can reliably all its kill non-daemonizing descendants. Downside: the + * child will not receive Ctrl-C etc during interactive use. + */ + Options& processGroupLeader() { + processGroupLeader_ = true; + return *this; + } + /** * Helpful way to combine Options. */ @@ -287,12 +296,13 @@ class Subprocess : private boost::noncopyable { private: typedef boost::container::flat_map FdMap; FdMap fdActions_; - bool closeOtherFds_; - bool usePath_; + bool closeOtherFds_{false}; + bool usePath_{false}; std::string childDir_; // "" keeps the parent's working directory #if __linux__ int parentDeathSignal_{0}; #endif + bool processGroupLeader_{false}; }; static Options pipeStdin() { return Options().stdin(PIPE); } @@ -372,26 +382,133 @@ class Subprocess : private boost::noncopyable { * identifying the stream; 0 = child's standard input, etc) * * The read and write callbacks must read from / write to pfd and return - * false during normal operation or true at end-of-file; - * communicate() will then close the pipe. Note that pfd is - * nonblocking, so be prepared for read() / write() to return -1 and - * set errno to EAGAIN (in which case you should return false). + * false during normal operation. Return true to tell communicate() to + * close the pipe. For readCallback, this might send SIGPIPE to the + * child, or make its writes fail with EPIPE, so you should generally + * avoid returning true unless you've reached end-of-file. * * NOTE that you MUST consume all data passed to readCallback (or return - * true, which will close the pipe, possibly sending SIGPIPE to the child or - * making its writes fail with EPIPE), and you MUST write to a writable pipe - * (or return true, which will close the pipe). To do otherwise is an - * error. You must do this even for pipes you are not interested in. + * true to close the pipe). Similarly, you MUST write to a writable pipe + * (or return true to close the pipe). To do otherwise is an error that + * can result in a deadlock. You must do this even for pipes you are not + * interested in. + * + * Note that pfd is nonblocking, so be prepared for read() / write() to + * return -1 and set errno to EAGAIN (in which case you should return + * false). Use readNoInt() from FileUtil.h to handle interrupted reads + * for you * * Note that communicate() returns when all pipes to/from the child are * closed; the child might stay alive after that, so you must still wait(). * * Most users won't need to use this; the simpler version of communicate * (which buffers data in memory) will probably work fine. + * + * See ReadLinesCallback for an easy way to consume the child's output + * streams line-by-line (or tokenized by another delimiter). */ typedef std::function FdCallback; void communicate(FdCallback readCallback, FdCallback writeCallback); + /** + * A readCallback for Subprocess::communicate() that helps you consume + * lines (or other delimited pieces) from your subprocess's file + * descriptors. Use the readLinesCallback() helper to get template + * deduction. For example: + * + * auto read_cb = Subprocess::readLinesCallback( + * [](int fd, folly::StringPiece s) { + * std::cout << fd << " said: " << s; + * return false; // Keep reading from the child + * } + * ); + * subprocess.communicate( + * // ReadLinesCallback contains StreamSplitter contains IOBuf, making + * // it noncopyable, whereas std::function must be copyable. So, we + * // keep the callback in a local, and instead pass a reference. + * std::ref(read_cb), + * [](int pdf, int cfd){ return true; } // Don't write to the child + * ); + * + * If a file line exceeds maxLineLength, your callback will get some + * initial chunks of maxLineLength with no trailing delimiters. The final + * chunk of a line is delimiter-terminated iff the delimiter was present + * in the input. In particular, the last line in a file always lacks a + * delimiter -- so if a file ends on a delimiter, the final line is empty. + * + * Like a regular communicate() callback, your fdLineCb() normally returns + * false. It may return true to tell Subprocess to close the underlying + * file descriptor. The child process may then receive SIGPIPE or get + * EPIPE errors on writes. + */ + template + class ReadLinesCallback { + private: + // Binds an FD to the client-provided FD+line callback + struct StreamSplitterCallback { + StreamSplitterCallback(Callback& cb, int fd) : cb_(cb), fd_(fd) { } + // The return value semantics are inverted vs StreamSplitter + bool operator()(StringPiece s) { return !cb_(fd_, s); } + Callback& cb_; + int fd_; + }; + typedef gen::StreamSplitter LineSplitter; + public: + explicit ReadLinesCallback( + Callback&& fdLineCb, + uint64_t maxLineLength = 0, // No line length limit by default + char delimiter = '\n', + uint64_t bufSize = 1024 + ) : fdLineCb_(std::move(fdLineCb)), + maxLineLength_(maxLineLength), + delimiter_(delimiter), + bufSize_(bufSize) {} + + bool operator()(int pfd, int cfd) { + // Make a splitter for this cfd if it doesn't already exist + auto it = fdToSplitter_.find(cfd); + auto& splitter = (it != fdToSplitter_.end()) ? it->second + : fdToSplitter_.emplace(cfd, LineSplitter( + delimiter_, StreamSplitterCallback(fdLineCb_, cfd), maxLineLength_ + )).first->second; + // Read as much as we can from this FD + char buf[bufSize_]; + while (true) { + ssize_t ret = readNoInt(pfd, buf, bufSize_); + if (ret == -1 && errno == EAGAIN) { // No more data for now + return false; + } + if (ret == 0) { // Reached end-of-file + splitter.flush(); // Ignore return since the file is over anyway + return true; + } + if (!splitter(StringPiece(buf, ret))) { + return true; // The callback told us to stop + } + } + } + + private: + Callback fdLineCb_; + const uint64_t maxLineLength_; + const char delimiter_; + const uint64_t bufSize_; + // We lazily make splitters for all cfds that get used. + std::unordered_map fdToSplitter_; + }; + + // Helper to enable template deduction + template + static ReadLinesCallback readLinesCallback( + Callback&& fdLineCb, + uint64_t maxLineLength = 0, // No line length limit by default + char delimiter = '\n', + uint64_t bufSize = 1024) { + return ReadLinesCallback( + std::move(fdLineCb), maxLineLength, delimiter, bufSize + ); + } + /** * Enable notifications (callbacks) for one pipe to/from child. By default, * all are enabled. Useful for "chatty" communication -- you want to disable @@ -551,10 +668,10 @@ inline Subprocess::Options& Subprocess::Options::operator|=( } closeOtherFds_ |= other.closeOtherFds_; usePath_ |= other.usePath_; + processGroupLeader_ |= other.processGroupLeader_; return *this; } } // namespace folly #endif /* FOLLY_SUBPROCESS_H_ */ -