From 1f4f30304bb22cd7e4320ea73490132259c87d8e Mon Sep 17 00:00:00 2001 From: Alexey Spiridonov Date: Fri, 25 Apr 2014 15:21:49 -0700 Subject: [PATCH] A generic line-reading callback for communicate() Summary: There are a couple of places where this behavior is useful, and it's not 100% trivial to implement it from scratch. Adding it to Folly to save people code & bugs. Test Plan: unit tests Reviewed By: tudorb@fb.com Subscribers: tjackson, folly@lists, tudorb FB internal diff: D1297506 --- folly/Subprocess.cpp | 1 - folly/Subprocess.h | 125 ++++++++++++++++++++++++++++++++++++++++--- 2 files changed, 117 insertions(+), 9 deletions(-) diff --git a/folly/Subprocess.cpp b/folly/Subprocess.cpp index db45c9e4..4cd2c7c1 100644 --- a/folly/Subprocess.cpp +++ b/folly/Subprocess.cpp @@ -39,7 +39,6 @@ #include #include -#include #include #include #include diff --git a/folly/Subprocess.h b/folly/Subprocess.h index 5acf6235..a0a5854f 100644 --- a/folly/Subprocess.h +++ b/folly/Subprocess.h @@ -70,6 +70,8 @@ #include #include +#include +#include #include #include #include @@ -372,26 +374,133 @@ class Subprocess : private boost::noncopyable { * identifying the stream; 0 = child's standard input, etc) * * The read and write callbacks must read from / write to pfd and return - * false during normal operation or true at end-of-file; - * communicate() will then close the pipe. Note that pfd is - * nonblocking, so be prepared for read() / write() to return -1 and - * set errno to EAGAIN (in which case you should return false). + * false during normal operation. Return true to tell communicate() to + * close the pipe. For readCallback, this might send SIGPIPE to the + * child, or make its writes fail with EPIPE, so you should generally + * avoid returning true unless you've reached end-of-file. * * NOTE that you MUST consume all data passed to readCallback (or return - * true, which will close the pipe, possibly sending SIGPIPE to the child or - * making its writes fail with EPIPE), and you MUST write to a writable pipe - * (or return true, which will close the pipe). To do otherwise is an - * error. You must do this even for pipes you are not interested in. + * true to close the pipe). Similarly, you MUST write to a writable pipe + * (or return true to close the pipe). To do otherwise is an error that + * can result in a deadlock. You must do this even for pipes you are not + * interested in. + * + * Note that pfd is nonblocking, so be prepared for read() / write() to + * return -1 and set errno to EAGAIN (in which case you should return + * false). Use readNoInt() from FileUtil.h to handle interrupted reads + * for you * * Note that communicate() returns when all pipes to/from the child are * closed; the child might stay alive after that, so you must still wait(). * * Most users won't need to use this; the simpler version of communicate * (which buffers data in memory) will probably work fine. + * + * See ReadLinesCallback for an easy way to consume the child's output + * streams line-by-line (or tokenized by another delimiter). */ typedef std::function FdCallback; void communicate(FdCallback readCallback, FdCallback writeCallback); + /** + * A readCallback for Subprocess::communicate() that helps you consume + * lines (or other delimited pieces) from your subprocess's file + * descriptors. Use the readLinesCallback() helper to get template + * deduction. For example: + * + * auto read_cb = Subprocess::readLinesCallback( + * [](int fd, folly::StringPiece s) { + * std::cout << fd << " said: " << s; + * return false; // Keep reading from the child + * } + * ); + * subprocess.communicate( + * // ReadLinesCallback contains StreamSplitter contains IOBuf, making + * // it noncopyable, whereas std::function must be copyable. So, we + * // keep the callback in a local, and instead pass a reference. + * std::ref(read_cb), + * [](int pdf, int cfd){ return true; } // Don't write to the child + * ); + * + * If a file line exceeds maxLineLength, your callback will get some + * initial chunks of maxLineLength with no trailing delimiters. The final + * chunk of a line is delimiter-terminated iff the delimiter was present + * in the input. In particular, the last line in a file always lacks a + * delimiter -- so if a file ends on a delimiter, the final line is empty. + * + * Like a regular communicate() callback, your fdLineCb() normally returns + * false. It may return true to tell Subprocess to close the underlying + * file descriptor. The child process may then receive SIGPIPE or get + * EPIPE errors on writes. + */ + template + class ReadLinesCallback { + private: + // Binds an FD to the client-provided FD+line callback + struct StreamSplitterCallback { + StreamSplitterCallback(Callback& cb, int fd) : cb_(cb), fd_(fd) { } + // The return value semantics are inverted vs StreamSplitter + bool operator()(StringPiece s) { return !cb_(fd_, s); } + Callback& cb_; + int fd_; + }; + typedef gen::StreamSplitter LineSplitter; + public: + explicit ReadLinesCallback( + Callback&& fdLineCb, + uint64_t maxLineLength = 0, // No line length limit by default + char delimiter = '\n', + uint64_t bufSize = 1024 + ) : fdLineCb_(std::move(fdLineCb)), + maxLineLength_(maxLineLength), + delimiter_(delimiter), + bufSize_(bufSize) {} + + bool operator()(int pfd, int cfd) { + // Make a splitter for this cfd if it doesn't already exist + auto it = fdToSplitter_.find(cfd); + auto& splitter = (it != fdToSplitter_.end()) ? it->second + : fdToSplitter_.emplace(cfd, LineSplitter( + delimiter_, StreamSplitterCallback(fdLineCb_, cfd), maxLineLength_ + )).first->second; + // Read as much as we can from this FD + char buf[bufSize_]; + while (true) { + ssize_t ret = readNoInt(pfd, buf, bufSize_); + if (ret == -1 && errno == EAGAIN) { // No more data for now + return false; + } + if (ret == 0) { // Reached end-of-file + splitter.flush(); // Ignore return since the file is over anyway + return true; + } + if (!splitter(StringPiece(buf, ret))) { + return true; // The callback told us to stop + } + } + } + + private: + Callback fdLineCb_; + const uint64_t maxLineLength_; + const char delimiter_; + const uint64_t bufSize_; + // We lazily make splitters for all cfds that get used. + std::unordered_map fdToSplitter_; + }; + + // Helper to enable template deduction + template + static ReadLinesCallback readLinesCallback( + Callback&& fdLineCb, + uint64_t maxLineLength = 0, // No line length limit by default + char delimiter = '\n', + uint64_t bufSize = 1024) { + return ReadLinesCallback( + std::move(fdLineCb), maxLineLength, delimiter, bufSize + ); + } + /** * Enable notifications (callbacks) for one pipe to/from child. By default, * all are enabled. Useful for "chatty" communication -- you want to disable -- 2.34.1