#include <boost/operators.hpp>
#include <boost/noncopyable.hpp>
+#include <folly/FileUtil.h>
+#include <folly/gen/String.h>
#include <folly/io/IOBufQueue.h>
#include <folly/MapUtil.h>
#include <folly/Portability.h>
* identifying the stream; 0 = child's standard input, etc)
*
* The read and write callbacks must read from / write to pfd and return
- * false during normal operation or true at end-of-file;
- * communicate() will then close the pipe. Note that pfd is
- * nonblocking, so be prepared for read() / write() to return -1 and
- * set errno to EAGAIN (in which case you should return false).
+ * false during normal operation. Return true to tell communicate() to
+ * close the pipe. For readCallback, this might send SIGPIPE to the
+ * child, or make its writes fail with EPIPE, so you should generally
+ * avoid returning true unless you've reached end-of-file.
*
* NOTE that you MUST consume all data passed to readCallback (or return
- * true, which will close the pipe, possibly sending SIGPIPE to the child or
- * making its writes fail with EPIPE), and you MUST write to a writable pipe
- * (or return true, which will close the pipe). To do otherwise is an
- * error. You must do this even for pipes you are not interested in.
+ * true to close the pipe). Similarly, you MUST write to a writable pipe
+ * (or return true to close the pipe). To do otherwise is an error that
+ * can result in a deadlock. You must do this even for pipes you are not
+ * interested in.
+ *
+ * Note that pfd is nonblocking, so be prepared for read() / write() to
+ * return -1 and set errno to EAGAIN (in which case you should return
+ * false). Use readNoInt() from FileUtil.h to handle interrupted reads
+ * for you
*
* Note that communicate() returns when all pipes to/from the child are
* closed; the child might stay alive after that, so you must still wait().
*
* Most users won't need to use this; the simpler version of communicate
* (which buffers data in memory) will probably work fine.
+ *
+ * See ReadLinesCallback for an easy way to consume the child's output
+ * streams line-by-line (or tokenized by another delimiter).
*/
typedef std::function<bool(int, int)> FdCallback;
void communicate(FdCallback readCallback, FdCallback writeCallback);
+ /**
+ * A readCallback for Subprocess::communicate() that helps you consume
+ * lines (or other delimited pieces) from your subprocess's file
+ * descriptors. Use the readLinesCallback() helper to get template
+ * deduction. For example:
+ *
+ * auto read_cb = Subprocess::readLinesCallback(
+ * [](int fd, folly::StringPiece s) {
+ * std::cout << fd << " said: " << s;
+ * return false; // Keep reading from the child
+ * }
+ * );
+ * subprocess.communicate(
+ * // ReadLinesCallback contains StreamSplitter contains IOBuf, making
+ * // it noncopyable, whereas std::function must be copyable. So, we
+ * // keep the callback in a local, and instead pass a reference.
+ * std::ref(read_cb),
+ * [](int pdf, int cfd){ return true; } // Don't write to the child
+ * );
+ *
+ * If a file line exceeds maxLineLength, your callback will get some
+ * initial chunks of maxLineLength with no trailing delimiters. The final
+ * chunk of a line is delimiter-terminated iff the delimiter was present
+ * in the input. In particular, the last line in a file always lacks a
+ * delimiter -- so if a file ends on a delimiter, the final line is empty.
+ *
+ * Like a regular communicate() callback, your fdLineCb() normally returns
+ * false. It may return true to tell Subprocess to close the underlying
+ * file descriptor. The child process may then receive SIGPIPE or get
+ * EPIPE errors on writes.
+ */
+ template <class Callback>
+ class ReadLinesCallback {
+ private:
+ // Binds an FD to the client-provided FD+line callback
+ struct StreamSplitterCallback {
+ StreamSplitterCallback(Callback& cb, int fd) : cb_(cb), fd_(fd) { }
+ // The return value semantics are inverted vs StreamSplitter
+ bool operator()(StringPiece s) { return !cb_(fd_, s); }
+ Callback& cb_;
+ int fd_;
+ };
+ typedef gen::StreamSplitter<StreamSplitterCallback> LineSplitter;
+ public:
+ explicit ReadLinesCallback(
+ Callback&& fdLineCb,
+ uint64_t maxLineLength = 0, // No line length limit by default
+ char delimiter = '\n',
+ uint64_t bufSize = 1024
+ ) : fdLineCb_(std::move(fdLineCb)),
+ maxLineLength_(maxLineLength),
+ delimiter_(delimiter),
+ bufSize_(bufSize) {}
+
+ bool operator()(int pfd, int cfd) {
+ // Make a splitter for this cfd if it doesn't already exist
+ auto it = fdToSplitter_.find(cfd);
+ auto& splitter = (it != fdToSplitter_.end()) ? it->second
+ : fdToSplitter_.emplace(cfd, LineSplitter(
+ delimiter_, StreamSplitterCallback(fdLineCb_, cfd), maxLineLength_
+ )).first->second;
+ // Read as much as we can from this FD
+ char buf[bufSize_];
+ while (true) {
+ ssize_t ret = readNoInt(pfd, buf, bufSize_);
+ if (ret == -1 && errno == EAGAIN) { // No more data for now
+ return false;
+ }
+ if (ret == 0) { // Reached end-of-file
+ splitter.flush(); // Ignore return since the file is over anyway
+ return true;
+ }
+ if (!splitter(StringPiece(buf, ret))) {
+ return true; // The callback told us to stop
+ }
+ }
+ }
+
+ private:
+ Callback fdLineCb_;
+ const uint64_t maxLineLength_;
+ const char delimiter_;
+ const uint64_t bufSize_;
+ // We lazily make splitters for all cfds that get used.
+ std::unordered_map<int, LineSplitter> fdToSplitter_;
+ };
+
+ // Helper to enable template deduction
+ template <class Callback>
+ static ReadLinesCallback<Callback> readLinesCallback(
+ Callback&& fdLineCb,
+ uint64_t maxLineLength = 0, // No line length limit by default
+ char delimiter = '\n',
+ uint64_t bufSize = 1024) {
+ return ReadLinesCallback<Callback>(
+ std::move(fdLineCb), maxLineLength, delimiter, bufSize
+ );
+ }
+
/**
* Enable notifications (callbacks) for one pipe to/from child. By default,
* all are enabled. Useful for "chatty" communication -- you want to disable