#include <folly/Conv.h>
#include <folly/String.h>
-#include <folly/io/IOBuf.h>
namespace folly {
namespace gen {
namespace detail {
-inline bool splitPrefix(StringPiece& in,
- StringPiece& prefix,
- StringPiece delimiter) {
- auto p = in.find(delimiter);
- if (p != std::string::npos) {
- prefix.assign(in.data(), in.data() + p);
- in.advance(p + delimiter.size());
- return true;
+/**
+ * Finds the first occurrence of delimiter in "in", advances "in" past the
+ * delimiter. Populates "prefix" with the consumed bytes, including the
+ * delimiter.
+ *
+ * Returns the number of trailing bytes of "prefix" that make up the
+ * delimiter, or 0 if the delimiter was not found.
+ */
+inline size_t splitPrefix(StringPiece& in,
+ StringPiece& prefix,
+ char delimiter) {
+ size_t found = in.find(delimiter);
+ if (found != StringPiece::npos) {
+ ++found;
+ prefix.assign(in.data(), in.data() + found);
+ in.advance(found);
+ return 1;
}
prefix.clear();
- return false;
+ return 0;
}
/**
- * Split by any of the EOL terms: \r, \n, or \r\n.
+ * As above, but supports multibyte delimiters.
*/
-inline bool splitPrefix(StringPiece& in,
- StringPiece& prefix,
- MixedNewlines) {
- auto newline = "\r\n";
- auto p = in.find_first_of(newline);
- if (p != std::string::npos) {
- prefix.assign(in.data(), in.data() + p);
- in.advance(p);
- if (!in.removePrefix(newline)) {
- in.advance(1);
- }
- return true;
+inline size_t splitPrefix(StringPiece& in,
+ StringPiece& prefix,
+ StringPiece delimiter) {
+ auto found = in.find(delimiter);
+ if (found != StringPiece::npos) {
+ found += delimiter.size();
+ prefix.assign(in.data(), in.data() + found);
+ in.advance(found);
+ return delimiter.size();
}
prefix.clear();
- return false;
+ return 0;
}
-inline bool splitPrefix(StringPiece& in, StringPiece& prefix, char delimiter) {
- auto p = static_cast<const char*>(memchr(in.data(), delimiter, in.size()));
- if (p) {
- prefix.assign(in.data(), p);
- in.assign(p + 1, in.end());
- return true;
+/**
+ * As above, but splits by any of the EOL terms: \r, \n, or \r\n.
+ */
+inline size_t splitPrefix(StringPiece& in,
+ StringPiece& prefix,
+ MixedNewlines) {
+ const auto kCRLF = "\r\n";
+ const size_t kLenCRLF = 2;
+
+ auto p = in.find_first_of(kCRLF);
+ if (p != std::string::npos) {
+ const auto in_start = in.data();
+ auto delim_len = 1;
+ in.advance(p);
+ // Either remove an MS-DOS CR-LF 2-byte newline, or eat 1 byte at a time.
+ if (in.removePrefix(kCRLF)) {
+ delim_len = kLenCRLF;
+ } else {
+ in.advance(delim_len);
+ }
+ prefix.assign(in_start, in.data());
+ return delim_len;
}
prefix.clear();
- return false;
+ return 0;
}
inline const char* ch(const unsigned char* p) {
return reinterpret_cast<const char*>(p);
}
+// Chop s into pieces of at most maxLength, feed them to cb
+template <class Callback>
+bool consumeFixedSizeChunks(Callback& cb, StringPiece& s, uint64_t maxLength) {
+ while (!s.empty()) {
+ auto num_to_add = s.size();
+ if (maxLength) {
+ num_to_add = std::min(num_to_add, maxLength);
+ }
+ if (!cb(StringPiece(s.begin(), num_to_add))) {
+ return false;
+ }
+ s.advance(num_to_add);
+ }
+ return true;
+}
+
+// Consumes all of buffer, plus n chars from s.
+template <class Callback>
+bool consumeBufferPlus(Callback& cb, IOBuf& buf, StringPiece& s, uint64_t n) {
+ buf.reserve(0, n);
+ memcpy(buf.writableTail(), s.data(), n);
+ buf.append(n);
+ s.advance(n);
+ if (!cb(StringPiece(detail::ch(buf.data()), buf.length()))) {
+ return false;
+ }
+ buf.clear();
+ return true;
+}
+
+} // namespace detail
+
+template <class Callback>
+bool StreamSplitter<Callback>::flush() {
+ CHECK(maxLength_ == 0 || buffer_.length() < maxLength_);
+ if (!pieceCb_(StringPiece(detail::ch(buffer_.data()), buffer_.length()))) {
+ return false;
+ }
+ // We are ready to handle another stream now.
+ buffer_.clear();
+ return true;
+}
+
+template <class Callback>
+bool StreamSplitter<Callback>::operator()(StringPiece in) {
+ StringPiece prefix;
+ // NB This code assumes a 1-byte delimiter. It's not too hard to support
+ // multibyte delimiters, just remember that maxLength_ chunks can end up
+ // falling in the middle of a delimiter.
+ bool found = detail::splitPrefix(in, prefix, delimiter_);
+ if (buffer_.length() != 0) {
+ if (found) {
+ uint64_t num_to_add = prefix.size();
+ if (maxLength_) {
+ CHECK(buffer_.length() < maxLength_);
+ // Consume as much of prefix as possible without exceeding maxLength_
+ num_to_add = std::min(maxLength_ - buffer_.length(), num_to_add);
+ }
+
+ // Append part of the prefix to the buffer, and send it to the callback
+ if (!detail::consumeBufferPlus(pieceCb_, buffer_, prefix, num_to_add)) {
+ return false;
+ }
+
+ if (!detail::consumeFixedSizeChunks(pieceCb_, prefix, maxLength_)) {
+ return false;
+ }
+
+ found = detail::splitPrefix(in, prefix, delimiter_);
+ // Post-conditions:
+ // - we consumed all of buffer_ and all of the first prefix.
+ // - found, in, and prefix reflect the second delimiter_ search
+ } else if (maxLength_ && buffer_.length() + in.size() >= maxLength_) {
+ // Send all of buffer_, plus a bit of in, to the callback
+ if (!detail::consumeBufferPlus(
+ pieceCb_, buffer_, in, maxLength_ - buffer_.length())) {
+ return false;
+ }
+ // Post-conditions:
+ // - we consumed all of buffer, and the minimal # of bytes from in
+ // - found is false
+ } // Otherwise: found is false & we cannot invoke the callback this turn
+ }
+ // Post-condition: buffer_ is nonempty only if found is false **and**
+ // len(buffer + in) < maxLength_.
+
+ // Send lines to callback directly from input (no buffer)
+ while (found) { // Buffer guaranteed to be empty
+ if (!detail::consumeFixedSizeChunks(pieceCb_, prefix, maxLength_)) {
+ return false;
+ }
+ found = detail::splitPrefix(in, prefix, delimiter_);
+ }
+
+ // No more delimiters left; consume 'in' until it is shorter than maxLength_
+ if (maxLength_) {
+ while (in.size() >= maxLength_) { // Buffer is guaranteed to be empty
+ if (!pieceCb_(StringPiece(in.begin(), maxLength_))) {
+ return false;
+ }
+ in.advance(maxLength_);
+ }
+ }
+
+ if (!in.empty()) { // Buffer may be nonempty
+ // Incomplete line left, append to buffer
+ buffer_.reserve(0, in.size());
+ memcpy(buffer_.writableTail(), in.data(), in.size());
+ buffer_.append(in.size());
+ }
+ CHECK(maxLength_ == 0 || buffer_.length() < maxLength_);
+ return true;
+}
+
+namespace detail {
+
class StringResplitter : public Operator<StringResplitter> {
char delimiter_;
public:
template <class Body>
bool apply(Body&& body) const {
- std::unique_ptr<IOBuf> buffer;
-
- auto fn = [&](StringPiece in) -> bool {
- StringPiece prefix;
- bool found = splitPrefix(in, prefix, this->delimiter_);
- if (found && buffer && buffer->length() != 0) {
- // Append to end of buffer, return line
- if (!prefix.empty()) {
- buffer->reserve(0, prefix.size());
- memcpy(buffer->writableTail(), prefix.data(), prefix.size());
- buffer->append(prefix.size());
- }
- if (!body(StringPiece(ch(buffer->data()), buffer->length()))) {
- return false;
- }
- buffer->clear();
- found = splitPrefix(in, prefix, this->delimiter_);
- }
- // Buffer is empty, return lines directly from input (no buffer)
- while (found) {
- if (!body(prefix)) {
- return false;
- }
- found = splitPrefix(in, prefix, this->delimiter_);
- }
- if (!in.empty()) {
- // Incomplete line left, append to buffer
- if (!buffer) {
- // Arbitrarily assume that we have half a line and get enough
- // room for twice that.
- constexpr size_t kDefaultLineSize = 256;
- buffer = IOBuf::create(std::max(kDefaultLineSize, 2 * in.size()));
- }
- buffer->reserve(0, in.size());
- memcpy(buffer->writableTail(), in.data(), in.size());
- buffer->append(in.size());
- }
- return true;
- };
-
- // Iterate
- if (!source_.apply(std::move(fn))) {
+ auto splitter =
+ streamSplitter(this->delimiter_, [this, &body](StringPiece s) {
+ // The stream ended with a delimiter; our contract is to swallow
+ // the final empty piece.
+ if (s.empty()) {
+ return false;
+ }
+ if (s.back() != this->delimiter_) {
+ return body(s);
+ }
+ s.pop_back(); // Remove the 1-character delimiter
+ return body(s);
+ });
+ if (!source_.apply(splitter)) {
return false;
}
-
- // Incomplete last line
- if (buffer && buffer->length() != 0) {
- if (!body(StringPiece(ch(buffer->data()), buffer->length()))) {
- return false;
- }
- }
- return true;
+ return splitter.flush();
}
static constexpr bool infinite = Source::infinite;
bool apply(Body&& body) const {
StringPiece rest(source_);
StringPiece prefix;
- while (splitPrefix(rest, prefix, this->delimiter_)) {
+ while (size_t delim_len = splitPrefix(rest, prefix, this->delimiter_)) {
+ prefix.subtract(delim_len); // Remove the delimiter
if (!body(prefix)) {
return false;
}
#include <folly/Range.h>
#include <folly/gen/Base.h>
+#include <folly/io/IOBuf.h>
namespace folly {
namespace gen {
*
* resplit() behaves as if the input strings were concatenated into one long
* string and then split.
+ *
+ * Equivalently, you can use StreamSplitter outside of a folly::gen setting.
*/
// make this a template so we don't require StringResplitter to be complete
// until use
to<fbstring>(delim)));
}
+/**
+ * Outputs exactly the same bytes as the input stream, in different chunks.
+ * A chunk boundary occurs after each delimiter, or, if maxLength is
+ * non-zero, after maxLength bytes, whichever comes first. Your callback
+ * can return false to stop consuming the stream at any time.
+ *
+ * The splitter buffers the last incomplete chunk, so you must call flush()
+ * to consume the piece of the stream after the final delimiter. This piece
+ * may be empty. After a flush(), the splitter can be re-used for a new
+ * stream.
+ *
+ * operator() and flush() return false iff your callback returns false. The
+ * internal buffer is not flushed, so reusing such a splitter will have
+ * indeterminate results. Same goes if your callback throws. Feel free to
+ * fix these corner cases if needed.
+ *
+ * Tips:
+ * - Create via streamSplitter() to take advantage of template deduction.
+ * - If your callback needs an end-of-stream signal, test for "no
+ * trailing delimiter **and** shorter than maxLength".
+ * - You can fine-tune the initial capacity of the internal IOBuf.
+ */
+template <class Callback>
+class StreamSplitter {
+
+ public:
+ StreamSplitter(char delimiter,
+ Callback&& pieceCb,
+ uint64_t maxLength = 0,
+ uint64_t initialCapacity = 0)
+ : buffer_(IOBuf::CREATE, initialCapacity),
+ delimiter_(delimiter),
+ maxLength_(maxLength),
+ pieceCb_(std::move(pieceCb)) {}
+
+ /**
+ * Consume any incomplete last line (may be empty). Do this before
+ * destroying the StreamSplitter, or you will fail to consume part of the
+ * input.
+ *
+ * After flush() you may proceed to consume the next stream via ().
+ *
+ * Returns false if the callback wants no more data, true otherwise.
+ * A return value of false means that this splitter must no longer be used.
+ */
+ bool flush();
+
+ /**
+ * Consume another piece of the input stream.
+ *
+ * Returns false only if your callback refuses to consume more data by
+ * returning false (true otherwise). A return value of false means that
+ * this splitter must no longer be used.
+ */
+ bool operator()(StringPiece in);
+
+ private:
+ // Holds the current "incomplete" chunk so that chunks can span calls to ()
+ IOBuf buffer_;
+ char delimiter_;
+ uint64_t maxLength_; // The callback never gets more chars than this
+ Callback pieceCb_;
+};
+
+template <class Callback> // Helper to enable template deduction
+StreamSplitter<Callback> streamSplitter(char delimiter,
+ Callback&& pieceCb,
+ uint64_t capacity = 0) {
+ return StreamSplitter<Callback>(delimiter, std::move(pieceCb), capacity);
+}
+
} // namespace gen
} // namespace folly
TEST(StringGen, SplitByNewLine) {
auto collect = eachTo<std::string>() | as<vector>();
{
- auto pieces = lines("hello\n\n world\r\n goodbye\r meow") | collect;
- EXPECT_EQ(5, pieces.size());
+ auto pieces = lines("hello\n\n world\r\n goodbye\r me\n\row") | collect;
+ EXPECT_EQ(7, pieces.size());
EXPECT_EQ("hello", pieces[0]);
EXPECT_EQ("", pieces[1]);
EXPECT_EQ(" world", pieces[2]);
EXPECT_EQ(" goodbye", pieces[3]);
- EXPECT_EQ(" meow", pieces[4]);
+ EXPECT_EQ(" me", pieces[4]);
+ EXPECT_EQ("", pieces[5]);
+ EXPECT_EQ("ow", pieces[6]);
}
}
}
}
+void checkResplitMaxLength(vector<string> ins,
+ char delim,
+ uint64_t maxLength,
+ vector<string> outs) {
+ vector<std::string> pieces;
+ auto splitter = streamSplitter(delim, [&pieces](StringPiece s) {
+ pieces.push_back(string(s.begin(), s.end()));
+ return true;
+ }, maxLength);
+ for (const auto& in : ins) {
+ splitter(in);
+ }
+ splitter.flush();
+
+ EXPECT_EQ(outs.size(), pieces.size());
+ for (int i = 0; i < outs.size(); ++i) {
+ EXPECT_EQ(outs[i], pieces[i]);
+ }
+
+ // Also check the concatenated input against the same output
+ if (ins.size() > 1) {
+ checkResplitMaxLength({folly::join("", ins)}, delim, maxLength, outs);
+ }
+}
+
+TEST(StringGen, ResplitMaxLength) {
+ checkResplitMaxLength(
+ {"hel", "lo,", ", world", ", goodbye, m", "ew"}, ',', 5,
+ {"hello", ",", ",", " worl", "d,", " good", "bye,", " mew"}
+ );
+ // " meow" cannot be "end of stream", since it's maxLength long
+ checkResplitMaxLength(
+ {"hel", "lo,", ", world", ", goodbye, m", "eow"}, ',', 5,
+ {"hello", ",", ",", " worl", "d,", " good", "bye,", " meow", ""}
+ );
+ checkResplitMaxLength(
+ {"||", "", "", "", "|a|b", "cdefghijklmn", "|opqrst",
+ "uvwx|y|||", "z", "0123456789", "|", ""}, '|', 2,
+ {"|", "|", "|", "a|", "bc", "de", "fg", "hi", "jk", "lm", "n|", "op", "qr",
+ "st", "uv", "wx", "|", "y|", "|", "|", "z0", "12", "34", "56", "78", "9|",
+ ""}
+ );
+}
+
template<typename F>
void runUnsplitSuite(F fn) {
fn("hello, world");