From eed46f426a62a15ffae8a2e73ca48617754f0a76 Mon Sep 17 00:00:00 2001 From: Tudor Bosman Date: Wed, 2 May 2012 16:42:35 -0700 Subject: [PATCH] Stream operations: file access, iteration, splitting. Summary: Intended to complement and replace strings::byLine. Test Plan: stream_test Reviewed By: delong.j@fb.com FB internal diff: D463341 --- folly/Range.h | 11 +- folly/experimental/io/Stream-inl.h | 98 ++++++++++ folly/experimental/io/Stream.cpp | 98 ++++++++++ folly/experimental/io/Stream.h | 226 ++++++++++++++++++++++ folly/experimental/io/test/StreamTest.cpp | 117 +++++++++++ 5 files changed, 549 insertions(+), 1 deletion(-) create mode 100644 folly/experimental/io/Stream-inl.h create mode 100644 folly/experimental/io/Stream.cpp create mode 100644 folly/experimental/io/Stream.h create mode 100644 folly/experimental/io/test/StreamTest.cpp diff --git a/folly/Range.h b/folly/Range.h index 65b7441d..ac316bc5 100644 --- a/folly/Range.h +++ b/folly/Range.h @@ -187,7 +187,8 @@ public: // Allow implicit conversion from Range (aka StringPiece) to // Range (aka ByteRange), as they're both frequently - // used to represent ranges of bytes. + // used to represent ranges of bytes. Allow explicit conversion in the other + // direction. template ::value && std::is_same::value), int>::type = 0> @@ -196,6 +197,14 @@ public: e_(reinterpret_cast(other.end())) { } + template ::value && + std::is_same::value), int>::type = 0> + explicit Range(const Range& other) + : b_(reinterpret_cast(other.begin())), + e_(reinterpret_cast(other.end())) { + } + void clear() { b_ = Iter(); e_ = Iter(); diff --git a/folly/experimental/io/Stream-inl.h b/folly/experimental/io/Stream-inl.h new file mode 100644 index 00000000..d4a7f3f4 --- /dev/null +++ b/folly/experimental/io/Stream-inl.h @@ -0,0 +1,98 @@ +/* + * Copyright 2012 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FOLLY_IO_STREAM_H_ +#error This file may only be included from Stream.h +#endif + +#include + +#include + +namespace folly { + +template +InputByteStreamSplitter::InputByteStreamSplitter( + char delimiter, Stream stream) + : done_(false), + delimiter_(delimiter), + stream_(std::move(stream)) { +} + +template +bool InputByteStreamSplitter::operator()(ByteRange& chunk) { + DCHECK_EQ(buffer_->length(), 0); + chunk.clear(); + if (rest_.empty()) { + if (done_) { + return false; + } else if (!stream_(rest_)) { + done_ = true; + return false; + } + } + + auto p = static_cast(memchr(rest_.data(), delimiter_, + rest_.size())); + if (p) { + chunk.assign(rest_.data(), p); + rest_.assign(p + 1, rest_.end()); + return true; + } + + // Incomplete line read, copy to buffer + if (!buffer_) { + static const size_t kDefaultLineSize = 256; + // Arbitrarily assume that we have half of a line in rest_, and + // get enough room for twice that. + buffer_ = IOBuf::create(std::max(kDefaultLineSize, 2 * rest_.size())); + } else { + buffer_->reserve(0, rest_.size()); + } + memcpy(buffer_->writableTail(), rest_.data(), rest_.size()); + buffer_->append(rest_.size()); + + while (stream_(rest_)) { + auto p = static_cast( + memchr(rest_.data(), delimiter_, rest_.size())); + if (p) { + // Copy everything up to the delimiter and return it + size_t n = p - rest_.data(); + buffer_->reserve(0, n); + memcpy(buffer_->writableTail(), rest_.data(), n); + buffer_->append(n); + chunk.reset(buffer_->data(), buffer_->length()); + buffer_->trimStart(buffer_->length()); + rest_.assign(p + 1, rest_.end()); + return true; + } + + // Nope, copy the entire chunk that we read + buffer_->reserve(0, rest_.size()); + memcpy(buffer_->writableTail(), rest_.data(), rest_.size()); + buffer_->append(rest_.size()); + } + + // Incomplete last line + done_ = true; + rest_.clear(); + chunk.reset(buffer_->data(), buffer_->length()); + buffer_->trimStart(buffer_->length()); + return true; +} + +} // namespace folly + diff --git a/folly/experimental/io/Stream.cpp b/folly/experimental/io/Stream.cpp new file mode 100644 index 00000000..2f519dcd --- /dev/null +++ b/folly/experimental/io/Stream.cpp @@ -0,0 +1,98 @@ +/* + * Copyright 2012 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "folly/experimental/io/Stream.h" + +#include +#include +#include + +#include +#include + +#include "folly/String.h" + +namespace folly { + +FileInputByteStream::FileInputByteStream(int fd, bool ownsFd, size_t bufferSize) + : fd_(fd), + ownsFd_(ownsFd), + buffer_(IOBuf::create(bufferSize)) { +} + +FileInputByteStream::FileInputByteStream(int fd, bool ownsFd, + std::unique_ptr&& buffer) + : fd_(fd), + ownsFd_(ownsFd), + buffer_(std::move(buffer)) { + buffer_->clear(); +} + +bool FileInputByteStream::operator()(ByteRange& chunk) { + ssize_t n = ::read(fd_, buffer_->writableTail(), buffer_->capacity()); + if (n == -1) { + throw std::system_error(errno, std::system_category(), "read failed"); + } + chunk.reset(buffer_->tail(), n); + return (n != 0); +} + +FileInputByteStream::FileInputByteStream(FileInputByteStream&& other) + : fd_(other.fd_), + ownsFd_(other.ownsFd_), + buffer_(std::move(other.buffer_)) { + other.fd_ = -1; + other.ownsFd_ = false; +} + +FileInputByteStream& FileInputByteStream::operator=( + FileInputByteStream&& other) { + if (&other != this) { + closeNoThrow(); + fd_ = other.fd_; + ownsFd_ = other.ownsFd_; + buffer_ = std::move(other.buffer_); + other.fd_ = -1; + other.ownsFd_ = false; + } + return *this; +} + +FileInputByteStream::~FileInputByteStream() { + closeNoThrow(); +} + +void FileInputByteStream::closeNoThrow() { + if (!ownsFd_) { + return; + } + ownsFd_ = false; + if (::close(fd_) == -1) { + PLOG(ERROR) << "close failed"; + } +} + +InputByteStreamSplitter byLine( + const char* fileName, char delim) { + int fd = ::open(fileName, O_RDONLY); + if (fd == -1) { + throw std::system_error(errno, std::system_category(), "open failed"); + } + return makeInputByteStreamSplitter(delim, FileInputByteStream(fd, true)); +} + +} // namespace folly + diff --git a/folly/experimental/io/Stream.h b/folly/experimental/io/Stream.h new file mode 100644 index 00000000..15b58aa7 --- /dev/null +++ b/folly/experimental/io/Stream.h @@ -0,0 +1,226 @@ +/* + * Copyright 2012 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FOLLY_IO_STREAM_H_ +#define FOLLY_IO_STREAM_H_ + +#include +#include + +#include "folly/Range.h" +#include "folly/FBString.h" +#include "folly/experimental/io/IOBuf.h" + +namespace folly { + +/** + * An InputByteStream is a functional object with the following signature: + * + * bool operator()(ByteRange& data); + * + * Input byte streams must be movable. + * + * The stream returns false at EOF; otherwise, it returns true and sets data to + * the next chunk of data from the stream. The memory that data points to must + * remain valid until the next call to the stream. In case of error, the + * stream throws an exception. + * + * The meaning of a "chunk" is left up to the stream implementation. Some + * streams return chunks limited to the size of an internal buffer. Other + * streams return the entire input as one (potentially huge) ByteRange. + * Others assign meaning to chunks: StreamSplitter returns "lines" -- sequences + * of bytes between delimiters. This ambiguity is intentional; resolving it + * would significantly increase the complexity of the code. + * + * An OutputByteStream is an object with the following signature: + * + * void operator()(ByteRange data); + * void close(); + * + * Output byte streams must be movable. + * + * The stream appends a chunk of data to the stream when calling operator(). + * close() closes the stream, allowing us to detect any errors before + * destroying the stream object (to avoid throwing exceptions from the + * destructor). The destructor must close the stream if close() was not + * explicitly called, and abort the program if closing the stream caused + * an error. + * + * Just like with input byte streams, the meaning of a "chunk" is left up + * to the stream implementation. Some streams will just append all chunks + * as given; others might assign meaning to chunks and (for example) append + * delimiters between chunks. + */ + +template class InputByteStreamIterator; + +/** + * Convenient base class template to derive all streams from; provides begin() + * and end() for iterator access. This class makes use of the curriously + * recurring template pattern; your stream class S may derive from + * InputByteStreamBase. + * + * Deriving from InputByteStreamBase is not required, but is convenient. + */ +template +class InputByteStreamBase { + public: + InputByteStreamIterator begin() { + return InputByteStreamIterator(static_cast(*this)); + } + + InputByteStreamIterator end() { + return InputByteStreamIterator(); + } + + InputByteStreamBase() { } + InputByteStreamBase(InputByteStreamBase&&) = default; + InputByteStreamBase& operator=(InputByteStreamBase&&) = default; + + private: + InputByteStreamBase(const InputByteStreamBase&) = delete; + InputByteStreamBase& operator=(const InputByteStreamBase&) = delete; +}; + +/** + * Stream iterator + */ +template +class InputByteStreamIterator + : public boost::iterator_facade< + InputByteStreamIterator, + const ByteRange, + boost::single_pass_traversal_tag> { + public: + InputByteStreamIterator() : stream_(nullptr) { } + + explicit InputByteStreamIterator(Stream& stream) : stream_(&stream) { + increment(); + } + + private: + friend class boost::iterator_core_access; + + void increment() { + DCHECK(stream_); + if (stream_ && !(*stream_)(chunk_)) { + stream_ = nullptr; + } + } + + // This is a single pass iterator, so all we care about is that + // equal forms an equivalence class on the subset of iterators that it's + // defined on. In our case, only identical (same object) iterators and + // past-the-end iterators compare equal. (so that it != end() works) + bool equal(const InputByteStreamIterator& other) const { + return (this == &other) || (!stream_ && !other.stream_); + } + + const ByteRange& dereference() const { + DCHECK(stream_); + return chunk_; + } + + Stream* stream_; + ByteRange chunk_; +}; + +/** + * Stream that read()s from a file. + */ +class FileInputByteStream : public InputByteStreamBase { + public: + static const size_t kDefaultBufferSize = 4096; + explicit FileInputByteStream(int fd, + bool ownsFd = false, + size_t bufferSize = kDefaultBufferSize); + FileInputByteStream(int fd, bool ownsFd, std::unique_ptr&& buffer); + FileInputByteStream(FileInputByteStream&& other); + FileInputByteStream& operator=(FileInputByteStream&& other); + ~FileInputByteStream(); + bool operator()(ByteRange& chunk); + + private: + void closeNoThrow(); + + int fd_; + bool ownsFd_; + std::unique_ptr buffer_; +}; + +/** + * Split a stream on a delimiter. Returns "lines" between delimiters; + * the delimiters are not included in the returned string. + * + * Note that the InputByteStreamSplitter acts as a stream itself, and you can + * iterate over it. + */ +template +class InputByteStreamSplitter + : public InputByteStreamBase> { + public: + InputByteStreamSplitter(char delimiter, Stream stream); + bool operator()(ByteRange& chunk); + + InputByteStreamSplitter(InputByteStreamSplitter&&) = default; + InputByteStreamSplitter& operator=(InputByteStreamSplitter&&) = default; + + private: + InputByteStreamSplitter(const InputByteStreamSplitter&) = delete; + InputByteStreamSplitter& operator=(const InputByteStreamSplitter&) = delete; + + bool done_; + char delimiter_; + Stream stream_; + std::unique_ptr buffer_; + ByteRange rest_; +}; + +/** + * Shortcut to create a stream splitter around a stream and deduce + * the type of the template argument. + */ +template +InputByteStreamSplitter makeInputByteStreamSplitter( + char delimiter, Stream stream) { + return InputByteStreamSplitter(delimiter, std::move(stream)); +} + +/** + * Create a stream that splits a file into chunks (default: lines, with + * '\n' as the delimiter) + */ +InputByteStreamSplitter byLine( + const char* fileName, char delim='\n'); + +// overload for std::string +inline InputByteStreamSplitter byLine( + const std::string& fileName, char delim='\n') { + return byLine(fileName.c_str(), delim); +} + +// overload for fbstring +inline InputByteStreamSplitter byLine( + const fbstring& fileName, char delim='\n') { + return byLine(fileName.c_str(), delim); +} + +} // namespace folly + +#include "folly/experimental/io/Stream-inl.h" + +#endif /* FOLLY_IO_STREAM_H_ */ + diff --git a/folly/experimental/io/test/StreamTest.cpp b/folly/experimental/io/test/StreamTest.cpp new file mode 100644 index 00000000..f0d3603a --- /dev/null +++ b/folly/experimental/io/test/StreamTest.cpp @@ -0,0 +1,117 @@ +/* + * Copyright 2012 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "folly/experimental/io/Stream.h" + +#include +#include + +#include +#include + +#include "folly/Benchmark.h" +#include "folly/experimental/TestUtil.h" + +using namespace folly; + +namespace { + +std::vector streamSplit(const std::string& str, char delimiter, + size_t maxChunkSize = (size_t)-1) { + size_t pos = 0; + auto cb = [&] (ByteRange& sp) mutable -> bool { + if (pos == str.size()) return false; + size_t n = std::min(str.size() - pos, maxChunkSize); + sp.reset(reinterpret_cast(&(str[pos])), n); + pos += n; + return true; + }; + + std::vector result; + for (auto line : makeInputByteStreamSplitter(delimiter, cb)) { + result.push_back(StringPiece(line).str()); + } + + return result; +} + +} // namespace + +TEST(InputByteStreamSplitter, Empty) { + { + auto pieces = streamSplit("", ','); + EXPECT_EQ(0, pieces.size()); + } + + // The last delimiter is eaten, just like std::getline + { + auto pieces = streamSplit(",", ','); + EXPECT_EQ(1, pieces.size()); + EXPECT_EQ("", pieces[0]); + } + + { + auto pieces = streamSplit(",,", ','); + EXPECT_EQ(2, pieces.size()); + EXPECT_EQ("", pieces[0]); + EXPECT_EQ("", pieces[1]); + } +} + +TEST(InputByteStreamSplitter, Simple) { + std::string str = "hello,, world, goodbye, meow"; + + for (size_t chunkSize = 1; chunkSize <= str.size(); ++chunkSize) { + auto pieces = streamSplit(str, ',', chunkSize); + EXPECT_EQ(5, pieces.size()); + EXPECT_EQ("hello", pieces[0]); + EXPECT_EQ("", pieces[1]); + EXPECT_EQ(" world", pieces[2]); + EXPECT_EQ(" goodbye", pieces[3]); + EXPECT_EQ(" meow", pieces[4]); + } +} + +TEST(ByLine, Simple) { + test::TemporaryFile file("ByLine"); + static const std::string lines( + "Hello world\n" + "This is the second line\n" + "\n" + "\n" + "a few empty lines above\n" + "incomplete last line"); + EXPECT_EQ(lines.size(), write(file.fd(), lines.data(), lines.size())); + + auto expected = streamSplit(lines, '\n'); + std::vector found; + for (auto& line : byLine(file.path())) { + found.push_back(StringPiece(line).str()); + } + + EXPECT_TRUE(expected == found); +} + +int main(int argc, char *argv[]) { + testing::InitGoogleTest(&argc, argv); + google::ParseCommandLineFlags(&argc, &argv, true); + auto ret = RUN_ALL_TESTS(); + if (!ret) { + folly::runBenchmarksOnFlag(); + } + return ret; +} + -- 2.34.1