return output;
}
+uint64_t Codec::maxCompressedLength(uint64_t uncompressedLength) const {
+ if (uncompressedLength == 0) {
+ return 0;
+ }
+ return doMaxCompressedLength(uncompressedLength);
+}
+
+Optional<uint64_t> Codec::getUncompressedLength(
+ const folly::IOBuf* data,
+ Optional<uint64_t> uncompressedLength) const {
+ auto const compressedLength = data->computeChainDataLength();
+ if (uncompressedLength == uint64_t(0) || compressedLength == 0) {
+ if (uncompressedLength.value_or(0) != 0 || compressedLength != 0) {
+ throw std::runtime_error("Invalid uncompressed length");
+ }
+ return 0;
+ }
+ return doGetUncompressedLength(data, uncompressedLength);
+}
+
+Optional<uint64_t> Codec::doGetUncompressedLength(
+ const folly::IOBuf*,
+ Optional<uint64_t> uncompressedLength) const {
+ return uncompressedLength;
+}
+
+bool StreamCodec::needsDataLength() const {
+ return doNeedsDataLength();
+}
+
+bool StreamCodec::doNeedsDataLength() const {
+ return false;
+}
+
+void StreamCodec::assertStateIs(State expected) const {
+ if (state_ != expected) {
+ throw std::logic_error(folly::to<std::string>(
+ "Codec: state is ", state_, "; expected state ", expected));
+ }
+}
+
+void StreamCodec::resetStream(Optional<uint64_t> uncompressedLength) {
+ state_ = State::RESET;
+ uncompressedLength_ = uncompressedLength;
+ doResetStream();
+}
+
+bool StreamCodec::compressStream(
+ ByteRange& input,
+ MutableByteRange& output,
+ StreamCodec::FlushOp flushOp) {
+ if (state_ == State::RESET && input.empty()) {
+ if (flushOp == StreamCodec::FlushOp::NONE) {
+ return false;
+ }
+ if (flushOp == StreamCodec::FlushOp::END &&
+ uncompressedLength().value_or(0) != 0) {
+ throw std::runtime_error("Codec: invalid uncompressed length");
+ }
+ return true;
+ }
+ if (state_ == State::RESET && !input.empty() &&
+ uncompressedLength() == uint64_t(0)) {
+ throw std::runtime_error("Codec: invalid uncompressed length");
+ }
+ // Handle input state transitions
+ switch (flushOp) {
+ case StreamCodec::FlushOp::NONE:
+ if (state_ == State::RESET) {
+ state_ = State::COMPRESS;
+ }
+ assertStateIs(State::COMPRESS);
+ break;
+ case StreamCodec::FlushOp::FLUSH:
+ if (state_ == State::RESET || state_ == State::COMPRESS) {
+ state_ = State::COMPRESS_FLUSH;
+ }
+ assertStateIs(State::COMPRESS_FLUSH);
+ break;
+ case StreamCodec::FlushOp::END:
+ if (state_ == State::RESET || state_ == State::COMPRESS) {
+ state_ = State::COMPRESS_END;
+ }
+ assertStateIs(State::COMPRESS_END);
+ break;
+ }
+ bool const done = doCompressStream(input, output, flushOp);
+ // Handle output state transitions
+ if (done) {
+ if (state_ == State::COMPRESS_FLUSH) {
+ state_ = State::COMPRESS;
+ } else if (state_ == State::COMPRESS_END) {
+ state_ = State::END;
+ }
+ // Check internal invariants
+ DCHECK(input.empty());
+ DCHECK(flushOp != StreamCodec::FlushOp::NONE);
+ }
+ return done;
+}
+
+bool StreamCodec::uncompressStream(
+ ByteRange& input,
+ MutableByteRange& output,
+ StreamCodec::FlushOp flushOp) {
+ if (state_ == State::RESET && input.empty()) {
+ if (uncompressedLength().value_or(0) == 0) {
+ return true;
+ }
+ return false;
+ }
+ // Handle input state transitions
+ if (state_ == State::RESET) {
+ state_ = State::UNCOMPRESS;
+ }
+ assertStateIs(State::UNCOMPRESS);
+ bool const done = doUncompressStream(input, output, flushOp);
+ // Handle output state transitions
+ if (done) {
+ state_ = State::END;
+ }
+ return done;
+}
+
+static std::unique_ptr<IOBuf> addOutputBuffer(
+ MutableByteRange& output,
+ uint64_t size) {
+ DCHECK(output.empty());
+ auto buffer = IOBuf::create(size);
+ buffer->append(buffer->capacity());
+ output = {buffer->writableData(), buffer->length()};
+ return buffer;
+}
+
+std::unique_ptr<IOBuf> StreamCodec::doCompress(IOBuf const* data) {
+ uint64_t const uncompressedLength = data->computeChainDataLength();
+ resetStream(uncompressedLength);
+ uint64_t const maxCompressedLen = maxCompressedLength(uncompressedLength);
+
+ auto constexpr kMaxSingleStepLength = uint64_t(64) << 20; // 64 MB
+ auto constexpr kDefaultBufferLength = uint64_t(4) << 20; // 4 MB
+
+ MutableByteRange output;
+ auto buffer = addOutputBuffer(
+ output,
+ maxCompressedLen <= kMaxSingleStepLength ? maxCompressedLen
+ : kDefaultBufferLength);
+
+ // Compress the entire IOBuf chain into the IOBuf chain pointed to by buffer
+ IOBuf const* current = data;
+ ByteRange input{current->data(), current->length()};
+ StreamCodec::FlushOp flushOp = StreamCodec::FlushOp::NONE;
+ for (;;) {
+ while (input.empty() && current->next() != data) {
+ current = current->next();
+ input = {current->data(), current->length()};
+ }
+ if (current->next() == data) {
+ // This is the last input buffer so end the stream
+ flushOp = StreamCodec::FlushOp::END;
+ }
+ if (output.empty()) {
+ buffer->prependChain(addOutputBuffer(output, kDefaultBufferLength));
+ }
+ bool const done = compressStream(input, output, flushOp);
+ if (done) {
+ DCHECK(input.empty());
+ DCHECK(flushOp == StreamCodec::FlushOp::END);
+ DCHECK_EQ(current->next(), data);
+ break;
+ }
+ }
+ buffer->prev()->trimEnd(output.size());
+ return buffer;
+}
+
+static uint64_t computeBufferLength(
+ uint64_t const compressedLength,
+ uint64_t const blockSize) {
+ uint64_t constexpr kMaxBufferLength = uint64_t(4) << 20; // 4 MiB
+ uint64_t const goodBufferSize = 4 * std::max(blockSize, compressedLength);
+ return std::min(goodBufferSize, kMaxBufferLength);
+}
+
+std::unique_ptr<IOBuf> StreamCodec::doUncompress(
+ IOBuf const* data,
+ Optional<uint64_t> uncompressedLength) {
+ auto constexpr kMaxSingleStepLength = uint64_t(64) << 20; // 64 MB
+ auto constexpr kBlockSize = uint64_t(128) << 10;
+ auto const defaultBufferLength =
+ computeBufferLength(data->computeChainDataLength(), kBlockSize);
+
+ uncompressedLength = getUncompressedLength(data, uncompressedLength);
+ resetStream(uncompressedLength);
+
+ MutableByteRange output;
+ auto buffer = addOutputBuffer(
+ output,
+ (uncompressedLength && *uncompressedLength <= kMaxSingleStepLength
+ ? *uncompressedLength
+ : defaultBufferLength));
+
+ // Uncompress the entire IOBuf chain into the IOBuf chain pointed to by buffer
+ IOBuf const* current = data;
+ ByteRange input{current->data(), current->length()};
+ StreamCodec::FlushOp flushOp = StreamCodec::FlushOp::NONE;
+ for (;;) {
+ while (input.empty() && current->next() != data) {
+ current = current->next();
+ input = {current->data(), current->length()};
+ }
+ if (current->next() == data) {
+ // Tell the uncompressor there is no more input (it may optimize)
+ flushOp = StreamCodec::FlushOp::END;
+ }
+ if (output.empty()) {
+ buffer->prependChain(addOutputBuffer(output, defaultBufferLength));
+ }
+ bool const done = uncompressStream(input, output, flushOp);
+ if (done) {
+ break;
+ }
+ }
+ if (!input.empty()) {
+ throw std::runtime_error("Codec: Junk after end of data");
+ }
+
+ buffer->prev()->trimEnd(output.size());
+ if (uncompressedLength &&
+ *uncompressedLength != buffer->computeChainDataLength()) {
+ throw std::runtime_error("Codec: invalid uncompressed length");
+ }
+
+ return buffer;
+}
+
namespace {
/**
explicit NoCompressionCodec(int level, CodecType type);
private:
+ uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override;
std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
std::unique_ptr<IOBuf> doUncompress(
const IOBuf* data,
}
}
+uint64_t NoCompressionCodec::doMaxCompressedLength(
+ uint64_t uncompressedLength) const {
+ return uncompressedLength;
+}
+
std::unique_ptr<IOBuf> NoCompressionCodec::doCompress(
const IOBuf* data) {
return data->clone();
memcpy(&result[0], &prefix, n);
return result;
}
-
-static uint64_t computeBufferLength(
- uint64_t const compressedLength,
- uint64_t const blockSize) {
- uint64_t constexpr kMaxBufferLength = uint64_t(4) << 20; // 4 MiB
- uint64_t const goodBufferSize = 4 * std::max(blockSize, compressedLength);
- return std::min(goodBufferSize, kMaxBufferLength);
-}
} // namespace
#if FOLLY_HAVE_LIBLZ4
private:
bool doNeedsUncompressedLength() const override;
uint64_t doMaxUncompressedLength() const override;
+ uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override;
bool encodeSize() const { return type() == CodecType::LZ4_VARINT_SIZE; }
return LZ4_MAX_INPUT_SIZE;
}
+uint64_t LZ4Codec::doMaxCompressedLength(uint64_t uncompressedLength) const {
+ return LZ4_compressBound(uncompressedLength) +
+ (encodeSize() ? kMaxVarintLength64 : 0);
+}
+
std::unique_ptr<IOBuf> LZ4Codec::doCompress(const IOBuf* data) {
IOBuf clone;
if (data->isChained()) {
data = &clone;
}
- uint32_t extraSize = encodeSize() ? kMaxVarintLength64 : 0;
- auto out = IOBuf::create(extraSize + LZ4_compressBound(data->length()));
+ auto out = IOBuf::create(maxCompressedLength(data->length()));
if (encodeSize()) {
encodeVarintToIOBuf(data->length(), out.get());
}
const override;
private:
+ uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override;
+
std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
std::unique_ptr<IOBuf> doUncompress(
const IOBuf* data,
return dataStartsWithLE(data, kLZ4FrameMagicLE);
}
+uint64_t LZ4FrameCodec::doMaxCompressedLength(
+ uint64_t uncompressedLength) const {
+ LZ4F_preferences_t prefs{};
+ prefs.compressionLevel = level_;
+ prefs.frameInfo.contentSize = uncompressedLength;
+ return LZ4F_compressFrameBound(uncompressedLength, &prefs);
+}
+
static size_t lz4FrameThrowOnError(size_t code) {
if (LZ4F_isError(code)) {
throw std::runtime_error(
prefs.compressionLevel = level_;
prefs.frameInfo.contentSize = uncompressedLength;
// Compress
- auto buf = IOBuf::create(LZ4F_compressFrameBound(uncompressedLength, &prefs));
+ auto buf = IOBuf::create(maxCompressedLength(uncompressedLength));
const size_t written = lz4FrameThrowOnError(LZ4F_compressFrame(
buf->writableTail(),
buf->tailroom(),
private:
uint64_t doMaxUncompressedLength() const override;
+ uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override;
std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
std::unique_ptr<IOBuf> doUncompress(
const IOBuf* data,
return std::numeric_limits<uint32_t>::max();
}
+uint64_t SnappyCodec::doMaxCompressedLength(uint64_t uncompressedLength) const {
+ return snappy::MaxCompressedLength(uncompressedLength);
+}
+
std::unique_ptr<IOBuf> SnappyCodec::doCompress(const IOBuf* data) {
IOBufSnappySource source(data);
- auto out =
- IOBuf::create(snappy::MaxCompressedLength(source.Available()));
+ auto out = IOBuf::create(maxCompressedLength(source.Available()));
snappy::UncheckedByteArraySink sink(reinterpret_cast<char*>(
out->writableTail()));
const override;
private:
+ uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override;
std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
std::unique_ptr<IOBuf> doUncompress(
const IOBuf* data,
}
}
+uint64_t ZlibCodec::doMaxCompressedLength(uint64_t uncompressedLength) const {
+ return deflateBound(nullptr, uncompressedLength);
+}
+
std::unique_ptr<Codec> ZlibCodec::create(int level, CodecType type) {
return std::make_unique<ZlibCodec>(level, type);
}
private:
bool doNeedsUncompressedLength() const override;
uint64_t doMaxUncompressedLength() const override;
+ uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override;
bool encodeSize() const { return type() == CodecType::LZMA2_VARINT_SIZE; }
return uint64_t(1) << 63;
}
+uint64_t LZMA2Codec::doMaxCompressedLength(uint64_t uncompressedLength) const {
+ return lzma_stream_buffer_bound(uncompressedLength) +
+ (encodeSize() ? kMaxVarintLength64 : 0);
+}
+
std::unique_ptr<IOBuf> LZMA2Codec::addOutputBuffer(
lzma_stream* stream,
size_t length) {
private:
bool doNeedsUncompressedLength() const override;
+ uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override;
std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
std::unique_ptr<IOBuf> doUncompress(
const IOBuf* data,
return false;
}
+uint64_t ZSTDCodec::doMaxCompressedLength(uint64_t uncompressedLength) const {
+ return ZSTD_compressBound(uncompressedLength);
+}
+
void zstdThrowIfError(size_t rc) {
if (!ZSTD_isError(rc)) {
return;
zstdThrowIfError(rc);
Cursor cursor(data);
- auto result = IOBuf::createCombined(ZSTD_compressBound(cursor.totalLength()));
+ auto result =
+ IOBuf::createCombined(maxCompressedLength(cursor.totalLength()));
ZSTD_outBuffer out;
out.dst = result->writableTail();
const override;
private:
+ uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override;
std::unique_ptr<IOBuf> doCompress(IOBuf const* data) override;
std::unique_ptr<IOBuf> doUncompress(
IOBuf const* data,
return dataStartsWithLE(data, kBzip2MagicLE, kBzip2MagicBytes);
}
+uint64_t Bzip2Codec::doMaxCompressedLength(uint64_t uncompressedLength) const {
+ // http://www.bzip.org/1.0.5/bzip2-manual-1.0.5.html#bzbufftobuffcompress
+ // To guarantee that the compressed data will fit in its buffer, allocate an
+ // output buffer of size 1% larger than the uncompressed data, plus six
+ // hundred extra bytes.
+ return uncompressedLength + uncompressedLength / 100 + 600;
+}
+
static bz_stream createBzStream() {
bz_stream stream;
stream.bzalloc = nullptr;
}
}
-static uint64_t bzCompressBound(uint64_t const uncompressedLength) {
- // http://www.bzip.org/1.0.5/bzip2-manual-1.0.5.html#bzbufftobuffcompress
- // To guarantee that the compressed data will fit in its buffer, allocate an
- // output buffer of size 1% larger than the uncompressed data, plus six
- // hundred extra bytes.
- return uncompressedLength + uncompressedLength / 100 + 600;
-}
-
static std::unique_ptr<IOBuf> addOutputBuffer(
bz_stream* stream,
uint64_t const bufferLength) {
};
uint64_t const uncompressedLength = data->computeChainDataLength();
- uint64_t const maxCompressedLength = bzCompressBound(uncompressedLength);
+ uint64_t const maxCompressedLen = maxCompressedLength(uncompressedLength);
uint64_t constexpr kMaxSingleStepLength = uint64_t(64) << 20; // 64 MiB
uint64_t constexpr kDefaultBufferLength = uint64_t(4) << 20;
auto out = addOutputBuffer(
&stream,
- maxCompressedLength <= kMaxSingleStepLength ? maxCompressedLength
- : kDefaultBufferLength);
+ maxCompressedLen <= kMaxSingleStepLength ? maxCompressedLen
+ : kDefaultBufferLength);
for (auto range : *data) {
while (!range.empty()) {
bool doNeedsUncompressedLength() const override;
uint64_t doMaxUncompressedLength() const override;
+ uint64_t doMaxCompressedLength(uint64_t) const override {
+ throw std::runtime_error(
+ "AutomaticCodec error: maxCompressedLength() not supported.");
+ }
std::unique_ptr<IOBuf> doCompress(const IOBuf*) override {
throw std::runtime_error("AutomaticCodec error: compress() not supported.");
}
throw std::runtime_error("AutomaticCodec error: Unknown compressed data");
}
-} // namespace
+using CodecFactory = std::unique_ptr<Codec> (*)(int, CodecType);
+using StreamCodecFactory = std::unique_ptr<StreamCodec> (*)(int, CodecType);
+struct Factory {
+ CodecFactory codec;
+ StreamCodecFactory stream;
+};
-typedef std::unique_ptr<Codec> (*CodecFactory)(int, CodecType);
-static constexpr CodecFactory
+constexpr Factory
codecFactories[static_cast<size_t>(CodecType::NUM_CODEC_TYPES)] = {
- nullptr, // USER_DEFINED
- NoCompressionCodec::create,
+ {}, // USER_DEFINED
+ {NoCompressionCodec::create, nullptr},
#if FOLLY_HAVE_LIBLZ4
- LZ4Codec::create,
+ {LZ4Codec::create, nullptr},
#else
- nullptr,
+ {},
#endif
#if FOLLY_HAVE_LIBSNAPPY
- SnappyCodec::create,
+ {SnappyCodec::create, nullptr},
#else
- nullptr,
+ {},
#endif
#if FOLLY_HAVE_LIBZ
- ZlibCodec::create,
+ {ZlibCodec::create, nullptr},
#else
- nullptr,
+ {},
#endif
#if FOLLY_HAVE_LIBLZ4
- LZ4Codec::create,
+ {LZ4Codec::create, nullptr},
#else
- nullptr,
+ {},
#endif
#if FOLLY_HAVE_LIBLZMA
- LZMA2Codec::create,
- LZMA2Codec::create,
+ {LZMA2Codec::create, nullptr},
+ {LZMA2Codec::create, nullptr},
#else
- nullptr,
- nullptr,
+ {},
+ {},
#endif
#if FOLLY_HAVE_LIBZSTD
- ZSTDCodec::create,
+ {ZSTDCodec::create, nullptr},
#else
- nullptr,
+ {},
#endif
#if FOLLY_HAVE_LIBZ
- ZlibCodec::create,
+ {ZlibCodec::create, nullptr},
#else
- nullptr,
+ {},
#endif
#if (FOLLY_HAVE_LIBLZ4 && LZ4_VERSION_NUMBER >= 10301)
- LZ4FrameCodec::create,
+ {LZ4FrameCodec::create, nullptr},
#else
- nullptr,
+ {},
#endif
#if FOLLY_HAVE_LIBBZ2
- Bzip2Codec::create,
+ {Bzip2Codec::create, nullptr},
#else
- nullptr
+ {},
#endif
};
-bool hasCodec(CodecType type) {
- size_t idx = static_cast<size_t>(type);
+Factory const& getFactory(CodecType type) {
+ size_t const idx = static_cast<size_t>(type);
if (idx >= static_cast<size_t>(CodecType::NUM_CODEC_TYPES)) {
throw std::invalid_argument(
to<std::string>("Compression type ", idx, " invalid"));
}
- return codecFactories[idx] != nullptr;
+ return codecFactories[idx];
+}
+} // namespace
+
+bool hasCodec(CodecType type) {
+ return getFactory(type).codec != nullptr;
}
std::unique_ptr<Codec> getCodec(CodecType type, int level) {
- size_t idx = static_cast<size_t>(type);
- if (idx >= static_cast<size_t>(CodecType::NUM_CODEC_TYPES)) {
+ auto const factory = getFactory(type).codec;
+ if (!factory) {
throw std::invalid_argument(
- to<std::string>("Compression type ", idx, " invalid"));
+ to<std::string>("Compression type ", type, " not supported"));
}
- auto factory = codecFactories[idx];
+ auto codec = (*factory)(level, type);
+ DCHECK(codec->type() == type);
+ return codec;
+}
+
+bool hasStreamCodec(CodecType type) {
+ return getFactory(type).stream != nullptr;
+}
+
+std::unique_ptr<StreamCodec> getStreamCodec(CodecType type, int level) {
+ auto const factory = getFactory(type).stream;
if (!factory) {
- throw std::invalid_argument(to<std::string>(
- "Compression type ", idx, " not supported"));
+ throw std::invalid_argument(
+ to<std::string>("Compression type ", type, " not supported"));
}
auto codec = (*factory)(level, type);
- DCHECK_EQ(static_cast<size_t>(codec->type()), idx);
+ DCHECK(codec->type() == type);
return codec;
}
public:
virtual ~Codec() { }
+ static constexpr uint64_t UNLIMITED_UNCOMPRESSED_LENGTH = uint64_t(-1);
/**
* Return the maximum length of data that may be compressed with this codec.
* NO_COMPRESSION and ZLIB support arbitrary lengths;
* LZ4 supports up to 1.9GiB; SNAPPY supports up to 4GiB.
- * May return UNLIMITED_UNCOMPRESSED_LENGTH (uint64_t(-1)) if unlimited.
+ * May return UNLIMITED_UNCOMPRESSED_LENGTH if unlimited.
*/
uint64_t maxUncompressedLength() const;
* Regardless of the behavior of the underlying compressor, uncompressing
* an empty IOBuf chain will return an empty IOBuf chain.
*/
- static constexpr uint64_t UNLIMITED_UNCOMPRESSED_LENGTH = uint64_t(-1);
-
std::unique_ptr<IOBuf> uncompress(
const IOBuf* data,
folly::Optional<uint64_t> uncompressedLength = folly::none);
StringPiece data,
folly::Optional<uint64_t> uncompressedLength = folly::none);
+ /**
+ * Returns a bound on the maximum compressed length when compressing data with
+ * the given uncompressed length.
+ */
+ uint64_t maxCompressedLength(uint64_t uncompressedLength) const;
+
+ /**
+ * Extracts the uncompressed length from the compressed data if possible.
+ * If the codec doesn't store the uncompressed length, or the data is
+ * corrupted it returns the given uncompressedLength.
+ * If the uncompressed length is stored in the compressed data and
+ * uncompressedLength is not none and they do not match a std::runtime_error
+ * is thrown.
+ */
+ folly::Optional<uint64_t> getUncompressedLength(
+ const folly::IOBuf* data,
+ folly::Optional<uint64_t> uncompressedLength = folly::none) const;
+
protected:
explicit Codec(CodecType type);
StringPiece data,
folly::Optional<uint64_t> uncompressedLength);
+ virtual uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const = 0;
+ // default: returns the passed uncompressedLength.
+ virtual folly::Optional<uint64_t> doGetUncompressedLength(
+ const folly::IOBuf* data,
+ folly::Optional<uint64_t> uncompressedLength) const;
+
CodecType type_;
};
+class StreamCodec : public Codec {
+ public:
+ virtual ~StreamCodec() {}
+
+ /**
+ * Does the codec need the data length before compression streaming?
+ */
+ bool needsDataLength() const;
+
+ /*****************************************************************************
+ * Streaming API
+ *****************************************************************************
+ * A low-level stateful streaming API.
+ * Streaming operations can be started in two ways:
+ * 1. From a clean Codec on which no non-const methods have been called.
+ * 2. A call to resetStream(), which will reset any codec to a clean state.
+ * After a streaming operation has begun, either compressStream() or
+ * uncompressStream() must be called until the streaming operation ends.
+ * compressStream() ends when it returns true with flushOp END.
+ * uncompressStream() ends when it returns true. At this point the codec
+ * may be reused by calling resetStream().
+ *
+ * compress() and uncompress() can be called at any time, but they interrupt
+ * any ongoing streaming operations (state is lost and resetStream() must be
+ * called before another streaming operation).
+ */
+
+ /**
+ * Reset the state of the codec, and set the uncompressed length for the next
+ * streaming operation. If uncompressedLength is not none it must be exactly
+ * the uncompressed length. compressStream() must be passed exactly
+ * uncompressedLength input bytes before the stream is ended.
+ * uncompressStream() must be passed a compressed frame that uncompresses to
+ * uncompressedLength.
+ */
+ void resetStream(folly::Optional<uint64_t> uncompressedLength = folly::none);
+
+ enum class FlushOp { NONE, FLUSH, END };
+
+ /**
+ * Compresses some data from the input buffer and writes the compressed data
+ * into the output buffer. It may read input without producing any output,
+ * except when forced to flush.
+ *
+ * The input buffer is advanced to point to the range of data that hasn't yet
+ * been read. Compression will resume at this point for the next call to
+ * compressStream(). The output buffer is advanced one byte past the last byte
+ * written.
+ *
+ * The default flushOp is NONE, which allows compressStream() complete
+ * discretion in how much data to gather before writing any output.
+ *
+ * If flushOp is END, all pending and input data is flushed to the output
+ * buffer, and the frame is ended. compressStream() must be called with the
+ * same input and flushOp END until it returns true. At this point the caller
+ * must call resetStream() to use the codec again.
+ *
+ * If flushOp is FLUSH, all pending and input data is flushed to the output
+ * buffer, but the frame is not ended. compressStream() must be called with
+ * the same input and flushOp END until it returns true. At this point the
+ * caller can continue to compressStream() with any input data and flushOp.
+ * The uncompressor, if passed all the produced output data, will be able to
+ * uncompress all the input data passed to compressStream() so far. Excessive
+ * use of flushOp FLUSH will deteriorate compression ratio. This is useful for
+ * stateful streaming across a network. Most users don't need to use this
+ * flushOp.
+ *
+ * A std::logic_error is thrown on incorrect usage of the API.
+ * A std::runtime_error is thrown upon error conditions.
+ */
+ bool compressStream(
+ folly::ByteRange& input,
+ folly::MutableByteRange& output,
+ FlushOp flushOp = StreamCodec::FlushOp::NONE);
+
+ /**
+ * Uncompresses some data from the input buffer and writes the uncompressed
+ * data into the output buffer. It may read input without producing any
+ * output.
+ *
+ * The input buffer is advanced to point to the range of data that hasn't yet
+ * been read. Uncompression will resume at this point for the next call to
+ * uncompressStream(). The output buffer is advanced one byte past the last
+ * byte written.
+ *
+ * The default flushOp is NONE, which allows uncompressStream() complete
+ * discretion in how much output data to flush. The uncompressor may not make
+ * maximum forward progress, but will make some forward progress when
+ * possible.
+ *
+ * If flushOp is END, the caller guarantees that no more input will be
+ * presented to uncompressStream(). uncompressStream() must be called with the
+ * same input and flushOp END until it returns true. This is not mandatory,
+ * but if the input is all available in one buffer, and there is enough output
+ * space to write the entire frame, codecs can uncompress faster.
+ *
+ * If flushOp is FLUSH, uncompressStream() is guaranteed to make the maximum
+ * amount of forward progress possible. When using this flushOp and
+ * uncompressStream() returns with `!output.empty()` the caller knows that all
+ * pending output has been flushed. This is useful for stateful streaming
+ * across a network, and it should be used in conjunction with
+ * compressStream() with flushOp FLUSH. Most users don't need to use this
+ * flushOp.
+ *
+ * Returns true at the end of a frame. At this point resetStream() must be
+ * called to reuse the codec.
+ */
+ bool uncompressStream(
+ folly::ByteRange& input,
+ folly::MutableByteRange& output,
+ FlushOp flushOp = StreamCodec::FlushOp::NONE);
+
+ protected:
+ explicit StreamCodec(CodecType type) : Codec(type) {}
+
+ // Returns the uncompressed length last passed to resetStream() or none if it
+ // hasn't been called yet.
+ folly::Optional<uint64_t> uncompressedLength() const {
+ return uncompressedLength_;
+ }
+
+ private:
+ // default: Implemented using the streaming API.
+ std::unique_ptr<IOBuf> doCompress(const folly::IOBuf* data) override;
+ virtual std::unique_ptr<IOBuf> doUncompress(
+ const folly::IOBuf* data,
+ folly::Optional<uint64_t> uncompressedLength) override;
+
+ // default: Returns false
+ virtual bool doNeedsDataLength() const;
+ virtual void doResetStream() = 0;
+ virtual bool doCompressStream(
+ folly::ByteRange& input,
+ folly::MutableByteRange& output,
+ FlushOp flushOp) = 0;
+ virtual bool doUncompressStream(
+ folly::ByteRange& input,
+ folly::MutableByteRange& output,
+ FlushOp flushOp) = 0;
+
+ enum class State {
+ RESET,
+ COMPRESS,
+ COMPRESS_FLUSH,
+ COMPRESS_END,
+ UNCOMPRESS,
+ END,
+ };
+ void assertStateIs(State expected) const;
+
+ CodecType type_;
+ State state_{State::RESET};
+ ByteRange previousInput_{};
+ folly::Optional<uint64_t> uncompressedLength_{};
+};
+
constexpr int COMPRESSION_LEVEL_FASTEST = -1;
constexpr int COMPRESSION_LEVEL_DEFAULT = -2;
constexpr int COMPRESSION_LEVEL_BEST = -3;
* decompress all data compressed with the a codec of the same type, regardless
* of compression level.
*/
-std::unique_ptr<Codec> getCodec(CodecType type,
- int level = COMPRESSION_LEVEL_DEFAULT);
+std::unique_ptr<Codec> getCodec(
+ CodecType type,
+ int level = COMPRESSION_LEVEL_DEFAULT);
+
+/**
+ * Return a codec for the given type. Throws on error. The level
+ * is a non-negative codec-dependent integer indicating the level of
+ * compression desired, or one of the following constants:
+ *
+ * COMPRESSION_LEVEL_FASTEST is fastest (uses least CPU / memory,
+ * worst compression)
+ * COMPRESSION_LEVEL_DEFAULT is the default (likely a tradeoff between
+ * FASTEST and BEST)
+ * COMPRESSION_LEVEL_BEST is the best compression (uses most CPU / memory,
+ * best compression)
+ *
+ * When decompressing, the compression level is ignored. All codecs will
+ * decompress all data compressed with the a codec of the same type, regardless
+ * of compression level.
+ */
+std::unique_ptr<StreamCodec> getStreamCodec(
+ CodecType type,
+ int level = COMPRESSION_LEVEL_DEFAULT);
/**
* Returns a codec that can uncompress any of the given codec types as well as
*/
bool hasCodec(CodecType type);
-}} // namespaces
+/**
+ * Check if a specified codec is supported and supports streaming.
+ */
+bool hasStreamCodec(CodecType type);
+}} // namespaces
#include <folly/io/Compression.h>
+#include <algorithm>
#include <random>
#include <set>
#include <thread>
#include <unordered_map>
+#include <utility>
#include <boost/noncopyable.hpp>
#include <glog/logging.h>
return codecs;
}
+static std::vector<CodecType> availableStreamCodecs() {
+ std::vector<CodecType> codecs;
+
+ for (size_t i = 0; i < static_cast<size_t>(CodecType::NUM_CODEC_TYPES); ++i) {
+ auto type = static_cast<CodecType>(i);
+ if (hasStreamCodec(type)) {
+ codecs.push_back(type);
+ }
+ }
+
+ return codecs;
+}
+
TEST(CompressionTestNeedsUncompressedLength, Simple) {
static const struct { CodecType type; bool needsUncompressedLength; }
expectations[] = {
CodecType::BZIP2,
})));
+class StreamingUnitTest : public testing::TestWithParam<CodecType> {
+ protected:
+ void SetUp() override {
+ codec_ = getStreamCodec(GetParam());
+ }
+
+ std::unique_ptr<StreamCodec> codec_;
+};
+
+TEST_P(StreamingUnitTest, maxCompressedLength) {
+ EXPECT_EQ(0, codec_->maxCompressedLength(0));
+ for (uint64_t const length : {1, 10, 100, 1000, 10000, 100000, 1000000}) {
+ EXPECT_GE(codec_->maxCompressedLength(length), length);
+ }
+}
+
+TEST_P(StreamingUnitTest, getUncompressedLength) {
+ auto const empty = IOBuf::create(0);
+ EXPECT_EQ(uint64_t(0), codec_->getUncompressedLength(empty.get()));
+ EXPECT_EQ(uint64_t(0), codec_->getUncompressedLength(empty.get(), 0));
+
+ auto const data = IOBuf::wrapBuffer(randomDataHolder.data(100));
+ auto const compressed = codec_->compress(data.get());
+
+ EXPECT_ANY_THROW(codec_->getUncompressedLength(data.get(), 0));
+ if (auto const length = codec_->getUncompressedLength(data.get())) {
+ EXPECT_EQ(100, *length);
+ }
+ EXPECT_EQ(uint64_t(100), codec_->getUncompressedLength(data.get(), 100));
+ // If the uncompressed length is stored in the frame, then make sure it throws
+ // when it is given the wrong length.
+ if (codec_->getUncompressedLength(data.get()) == uint64_t(100)) {
+ EXPECT_ANY_THROW(codec_->getUncompressedLength(data.get(), 200));
+ }
+}
+
+TEST_P(StreamingUnitTest, emptyData) {
+ ByteRange input{};
+ auto buffer = IOBuf::create(1);
+ buffer->append(buffer->capacity());
+ MutableByteRange output{};
+
+ // Test compressing empty data in one pass
+ EXPECT_TRUE(codec_->compressStream(input, output, StreamCodec::FlushOp::END));
+ codec_->resetStream(0);
+ EXPECT_TRUE(codec_->compressStream(input, output, StreamCodec::FlushOp::END));
+ codec_->resetStream();
+ output = {buffer->writableData(), buffer->length()};
+ EXPECT_TRUE(codec_->compressStream(input, output, StreamCodec::FlushOp::END));
+ EXPECT_EQ(buffer->length(), output.size());
+
+ // Test compressing empty data with multiple calls to compressStream()
+ codec_->resetStream();
+ output = {};
+ EXPECT_FALSE(codec_->compressStream(input, output));
+ EXPECT_TRUE(
+ codec_->compressStream(input, output, StreamCodec::FlushOp::FLUSH));
+ EXPECT_TRUE(codec_->compressStream(input, output, StreamCodec::FlushOp::END));
+ codec_->resetStream();
+ output = {buffer->writableData(), buffer->length()};
+ EXPECT_FALSE(codec_->compressStream(input, output));
+ EXPECT_TRUE(
+ codec_->compressStream(input, output, StreamCodec::FlushOp::FLUSH));
+ EXPECT_TRUE(codec_->compressStream(input, output, StreamCodec::FlushOp::END));
+ EXPECT_EQ(buffer->length(), output.size());
+
+ // Test uncompressing empty data
+ output = {};
+ codec_->resetStream();
+ EXPECT_TRUE(codec_->uncompressStream(input, output));
+ codec_->resetStream();
+ EXPECT_TRUE(
+ codec_->uncompressStream(input, output, StreamCodec::FlushOp::FLUSH));
+ codec_->resetStream();
+ EXPECT_TRUE(
+ codec_->uncompressStream(input, output, StreamCodec::FlushOp::END));
+ codec_->resetStream(0);
+ EXPECT_TRUE(codec_->uncompressStream(input, output));
+ codec_->resetStream(0);
+ EXPECT_TRUE(
+ codec_->uncompressStream(input, output, StreamCodec::FlushOp::FLUSH));
+ codec_->resetStream(0);
+ EXPECT_TRUE(
+ codec_->uncompressStream(input, output, StreamCodec::FlushOp::END));
+}
+
+TEST_P(StreamingUnitTest, noForwardProgressOkay) {
+ auto inBuffer = IOBuf::create(2);
+ inBuffer->writableData()[0] = 'a';
+ inBuffer->writableData()[0] = 'a';
+ inBuffer->append(2);
+ auto input = inBuffer->coalesce();
+ auto compressed = codec_->compress(inBuffer.get());
+
+ auto outBuffer = IOBuf::create(codec_->maxCompressedLength(2));
+ MutableByteRange output{outBuffer->writableTail(), outBuffer->tailroom()};
+
+ ByteRange emptyInput;
+ MutableByteRange emptyOutput;
+
+ // Compress some data to avoid empty data special casing
+ codec_->resetStream();
+ while (!input.empty()) {
+ codec_->compressStream(input, output);
+ }
+ // empty input and output is okay for flush NONE and FLUSH.
+ codec_->compressStream(emptyInput, emptyOutput);
+ codec_->compressStream(emptyInput, emptyOutput, StreamCodec::FlushOp::FLUSH);
+
+ codec_->resetStream();
+ input = inBuffer->coalesce();
+ output = {outBuffer->writableTail(), outBuffer->tailroom()};
+ while (!input.empty()) {
+ codec_->compressStream(input, output);
+ }
+ // empty input and output is okay for flush END.
+ codec_->compressStream(emptyInput, emptyOutput, StreamCodec::FlushOp::END);
+
+ codec_->resetStream();
+ input = compressed->coalesce();
+ input.uncheckedSubtract(1); // Remove last byte so the operation is incomplete
+ output = {inBuffer->writableData(), inBuffer->length()};
+ // Uncompress some data to avoid empty data special casing
+ while (!input.empty()) {
+ EXPECT_FALSE(codec_->uncompressStream(input, output));
+ }
+ // empty input and output is okay for all flush values.
+ EXPECT_FALSE(codec_->uncompressStream(emptyInput, emptyOutput));
+ EXPECT_FALSE(codec_->uncompressStream(
+ emptyInput, emptyOutput, StreamCodec::FlushOp::FLUSH));
+ EXPECT_FALSE(codec_->uncompressStream(
+ emptyInput, emptyOutput, StreamCodec::FlushOp::END));
+}
+
+TEST_P(StreamingUnitTest, stateTransitions) {
+ auto inBuffer = IOBuf::create(1);
+ inBuffer->writableData()[0] = 'a';
+ inBuffer->append(1);
+ auto compressed = codec_->compress(inBuffer.get());
+ ByteRange const in = compressed->coalesce();
+ auto outBuffer = IOBuf::create(codec_->maxCompressedLength(in.size()));
+ MutableByteRange const out{outBuffer->writableTail(), outBuffer->tailroom()};
+
+ auto compress = [&](
+ StreamCodec::FlushOp flushOp = StreamCodec::FlushOp::NONE,
+ bool empty = false) {
+ auto input = in;
+ auto output = empty ? MutableByteRange{} : out;
+ return codec_->compressStream(input, output, flushOp);
+ };
+ auto uncompress = [&](
+ StreamCodec::FlushOp flushOp = StreamCodec::FlushOp::NONE,
+ bool empty = false) {
+ auto input = in;
+ auto output = empty ? MutableByteRange{} : out;
+ return codec_->uncompressStream(input, output, flushOp);
+ };
+
+ // compression flow
+ codec_->resetStream();
+ EXPECT_FALSE(compress());
+ EXPECT_FALSE(compress());
+ EXPECT_TRUE(compress(StreamCodec::FlushOp::FLUSH));
+ EXPECT_FALSE(compress());
+ EXPECT_TRUE(compress(StreamCodec::FlushOp::END));
+ // uncompression flow
+ codec_->resetStream();
+ EXPECT_FALSE(uncompress(StreamCodec::FlushOp::NONE, true));
+ codec_->resetStream();
+ EXPECT_FALSE(uncompress(StreamCodec::FlushOp::FLUSH, true));
+ codec_->resetStream();
+ EXPECT_FALSE(uncompress(StreamCodec::FlushOp::NONE, true));
+ codec_->resetStream();
+ EXPECT_FALSE(uncompress(StreamCodec::FlushOp::NONE, true));
+ codec_->resetStream();
+ EXPECT_TRUE(uncompress(StreamCodec::FlushOp::FLUSH));
+ // compress -> uncompress
+ codec_->resetStream();
+ EXPECT_FALSE(compress());
+ EXPECT_THROW(uncompress(), std::logic_error);
+ // uncompress -> compress
+ codec_->resetStream();
+ EXPECT_TRUE(uncompress(StreamCodec::FlushOp::FLUSH));
+ EXPECT_THROW(compress(), std::logic_error);
+ // end -> compress
+ codec_->resetStream();
+ EXPECT_FALSE(compress());
+ EXPECT_TRUE(compress(StreamCodec::FlushOp::END));
+ EXPECT_THROW(compress(), std::logic_error);
+ // end -> uncompress
+ codec_->resetStream();
+ EXPECT_TRUE(uncompress(StreamCodec::FlushOp::FLUSH));
+ EXPECT_THROW(uncompress(), std::logic_error);
+ // flush -> compress
+ codec_->resetStream();
+ EXPECT_FALSE(compress(StreamCodec::FlushOp::FLUSH, true));
+ EXPECT_THROW(compress(), std::logic_error);
+ // flush -> end
+ codec_->resetStream();
+ EXPECT_FALSE(compress(StreamCodec::FlushOp::FLUSH, true));
+ EXPECT_THROW(compress(StreamCodec::FlushOp::END), std::logic_error);
+ // undefined -> compress
+ codec_->compress(inBuffer.get());
+ EXPECT_THROW(compress(), std::logic_error);
+ codec_->uncompress(compressed.get());
+ EXPECT_THROW(compress(), std::logic_error);
+ // undefined -> undefined
+ codec_->uncompress(compressed.get());
+ codec_->compress(inBuffer.get());
+}
+
+INSTANTIATE_TEST_CASE_P(
+ StreamingUnitTest,
+ StreamingUnitTest,
+ testing::ValuesIn(availableStreamCodecs()));
+
+class StreamingCompressionTest
+ : public testing::TestWithParam<std::tuple<int, int, CodecType>> {
+ protected:
+ void SetUp() override {
+ auto const tup = GetParam();
+ uncompressedLength_ = uint64_t(1) << std::get<0>(tup);
+ chunkSize_ = size_t(1) << std::get<1>(tup);
+ codec_ = getStreamCodec(std::get<2>(tup));
+ }
+
+ void runResetStreamTest(DataHolder const& dh);
+ void runCompressStreamTest(DataHolder const& dh);
+ void runUncompressStreamTest(DataHolder const& dh);
+ void runFlushTest(DataHolder const& dh);
+
+ private:
+ std::vector<ByteRange> split(ByteRange data) const;
+
+ uint64_t uncompressedLength_;
+ size_t chunkSize_;
+ std::unique_ptr<StreamCodec> codec_;
+};
+
+std::vector<ByteRange> StreamingCompressionTest::split(ByteRange data) const {
+ size_t const pieces = std::max<size_t>(1, data.size() / chunkSize_);
+ std::vector<ByteRange> result;
+ result.reserve(pieces + 1);
+ while (!data.empty()) {
+ size_t const pieceSize = std::min(data.size(), chunkSize_);
+ result.push_back(data.subpiece(0, pieceSize));
+ data.uncheckedAdvance(pieceSize);
+ }
+ return result;
+}
+
+static std::unique_ptr<IOBuf> compressSome(
+ StreamCodec* codec,
+ ByteRange data,
+ uint64_t bufferSize,
+ StreamCodec::FlushOp flush) {
+ bool result;
+ IOBufQueue queue;
+ do {
+ auto buffer = IOBuf::create(bufferSize);
+ buffer->append(buffer->capacity());
+ MutableByteRange output{buffer->writableData(), buffer->length()};
+
+ result = codec->compressStream(data, output, flush);
+ buffer->trimEnd(output.size());
+ queue.append(std::move(buffer));
+
+ } while (!(flush == StreamCodec::FlushOp::NONE && data.empty()) && !result);
+ EXPECT_TRUE(data.empty());
+ return queue.move();
+}
+
+static std::pair<bool, std::unique_ptr<IOBuf>> uncompressSome(
+ StreamCodec* codec,
+ ByteRange& data,
+ uint64_t bufferSize,
+ StreamCodec::FlushOp flush) {
+ bool result;
+ IOBufQueue queue;
+ do {
+ auto buffer = IOBuf::create(bufferSize);
+ buffer->append(buffer->capacity());
+ MutableByteRange output{buffer->writableData(), buffer->length()};
+
+ result = codec->uncompressStream(data, output, flush);
+ buffer->trimEnd(output.size());
+ queue.append(std::move(buffer));
+
+ } while (queue.tailroom() == 0 && !result);
+ return std::make_pair(result, queue.move());
+}
+
+void StreamingCompressionTest::runResetStreamTest(DataHolder const& dh) {
+ auto const input = dh.data(uncompressedLength_);
+ // Compress some but leave state unclean
+ codec_->resetStream(uncompressedLength_);
+ compressSome(codec_.get(), input, chunkSize_, StreamCodec::FlushOp::NONE);
+ // Reset stream and compress all
+ codec_->resetStream();
+ auto compressed =
+ compressSome(codec_.get(), input, chunkSize_, StreamCodec::FlushOp::END);
+ auto const uncompressed = codec_->uncompress(compressed.get(), input.size());
+ EXPECT_EQ(dh.hash(uncompressedLength_), hashIOBuf(uncompressed.get()));
+}
+
+TEST_P(StreamingCompressionTest, resetStream) {
+ runResetStreamTest(constantDataHolder);
+ runResetStreamTest(randomDataHolder);
+}
+
+void StreamingCompressionTest::runCompressStreamTest(
+ const folly::io::test::DataHolder& dh) {
+ auto const inputs = split(dh.data(uncompressedLength_));
+
+ IOBufQueue queue;
+ codec_->resetStream(uncompressedLength_);
+ // Compress many inputs in a row
+ for (auto const input : inputs) {
+ queue.append(compressSome(
+ codec_.get(), input, chunkSize_, StreamCodec::FlushOp::NONE));
+ }
+ // Finish the operation with empty input.
+ ByteRange empty;
+ queue.append(
+ compressSome(codec_.get(), empty, chunkSize_, StreamCodec::FlushOp::END));
+
+ auto const uncompressed = codec_->uncompress(queue.front());
+ EXPECT_EQ(dh.hash(uncompressedLength_), hashIOBuf(uncompressed.get()));
+}
+
+TEST_P(StreamingCompressionTest, compressStream) {
+ runCompressStreamTest(constantDataHolder);
+ runCompressStreamTest(randomDataHolder);
+}
+
+void StreamingCompressionTest::runUncompressStreamTest(
+ const folly::io::test::DataHolder& dh) {
+ auto const data = IOBuf::wrapBuffer(dh.data(uncompressedLength_));
+ // Concatenate 3 compressed frames in a row
+ auto compressed = codec_->compress(data.get());
+ compressed->prependChain(codec_->compress(data.get()));
+ compressed->prependChain(codec_->compress(data.get()));
+ // Pass all 3 compressed frames in one input buffer
+ auto input = compressed->coalesce();
+ // Uncompress the first frame
+ codec_->resetStream(data->computeChainDataLength());
+ {
+ auto const result = uncompressSome(
+ codec_.get(), input, chunkSize_, StreamCodec::FlushOp::FLUSH);
+ ASSERT_TRUE(result.first);
+ ASSERT_EQ(hashIOBuf(data.get()), hashIOBuf(result.second.get()));
+ }
+ // Uncompress the second frame
+ codec_->resetStream();
+ {
+ auto const result = uncompressSome(
+ codec_.get(), input, chunkSize_, StreamCodec::FlushOp::END);
+ ASSERT_TRUE(result.first);
+ ASSERT_EQ(hashIOBuf(data.get()), hashIOBuf(result.second.get()));
+ }
+ // Uncompress the third frame
+ codec_->resetStream();
+ {
+ auto const result = uncompressSome(
+ codec_.get(), input, chunkSize_, StreamCodec::FlushOp::FLUSH);
+ ASSERT_TRUE(result.first);
+ ASSERT_EQ(hashIOBuf(data.get()), hashIOBuf(result.second.get()));
+ }
+ EXPECT_TRUE(input.empty());
+}
+
+TEST_P(StreamingCompressionTest, uncompressStream) {
+ runUncompressStreamTest(constantDataHolder);
+ runUncompressStreamTest(randomDataHolder);
+}
+
+void StreamingCompressionTest::runFlushTest(DataHolder const& dh) {
+ auto const inputs = split(dh.data(uncompressedLength_));
+ auto uncodec = getStreamCodec(codec_->type());
+
+ codec_->resetStream();
+ for (auto input : inputs) {
+ // Compress some data and flush the stream
+ auto compressed = compressSome(
+ codec_.get(), input, chunkSize_, StreamCodec::FlushOp::FLUSH);
+ auto compressedRange = compressed->coalesce();
+ // Uncompress the compressed data
+ auto result = uncompressSome(
+ uncodec.get(),
+ compressedRange,
+ chunkSize_,
+ StreamCodec::FlushOp::FLUSH);
+ // All compressed data should have been consumed
+ EXPECT_TRUE(compressedRange.empty());
+ // The frame isn't complete
+ EXPECT_FALSE(result.first);
+ // The uncompressed data should be exactly the input data
+ EXPECT_EQ(input.size(), result.second->computeChainDataLength());
+ auto const data = IOBuf::wrapBuffer(input);
+ EXPECT_EQ(hashIOBuf(data.get()), hashIOBuf(result.second.get()));
+ }
+}
+
+TEST_P(StreamingCompressionTest, testFlush) {
+ runFlushTest(constantDataHolder);
+ runFlushTest(randomDataHolder);
+}
+
+INSTANTIATE_TEST_CASE_P(
+ StreamingCompressionTest,
+ StreamingCompressionTest,
+ testing::Combine(
+ testing::Values(0, 1, 12, 22, 27),
+ testing::Values(12, 17, 20),
+ testing::ValuesIn(availableStreamCodecs())));
+
class AutomaticCodecTest : public testing::TestWithParam<CodecType> {
protected:
void SetUp() override {
return {prefix_};
}
+ uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override {
+ return codec_->maxCompressedLength(uncompressedLength) + prefix_.size();
+ }
+
bool canUncompress(const IOBuf* data, Optional<uint64_t>) const override {
auto clone = data->cloneCoalescedAsValue();
if (clone.length() < prefix_.size()) {