From cca8b5c3bbaa204e56384720a3b1a0f8280fb54b Mon Sep 17 00:00:00 2001 From: Nick Terrell Date: Fri, 24 Mar 2017 11:08:17 -0700 Subject: [PATCH] Add LZ4_FRAME codec Summary: The LZ4 Frame codec encodes data using the LZ4 frame format. One advantage of the LZ4 frame format is that it has 4 magic bytes in the header, so users can transparently determine compression type. It also allows the user to interop with the lz4 command line tool. Reviewed By: yfeldblum Differential Revision: D4715918 fbshipit-source-id: 689833fef526b1cfe98685179e7b494380d49cba --- folly/io/Compression.cpp | 162 +++++++++++++++++++++++++++++- folly/io/Compression.h | 8 +- folly/io/test/CompressionTest.cpp | 5 +- 3 files changed, 171 insertions(+), 4 deletions(-) diff --git a/folly/io/Compression.cpp b/folly/io/Compression.cpp index d618da0c..e0beac2a 100644 --- a/folly/io/Compression.cpp +++ b/folly/io/Compression.cpp @@ -18,6 +18,7 @@ #if FOLLY_HAVE_LIBLZ4 #include +#include #include #endif @@ -383,7 +384,160 @@ std::unique_ptr LZ4Codec::doUncompress( return out; } -#endif // FOLLY_HAVE_LIBLZ4 +class LZ4FrameCodec final : public Codec { + public: + static std::unique_ptr create(int level, CodecType type); + explicit LZ4FrameCodec(int level, CodecType type); + ~LZ4FrameCodec(); + + private: + std::unique_ptr doCompress(const IOBuf* data) override; + std::unique_ptr doUncompress( + const IOBuf* data, + uint64_t uncompressedLength) override; + + // Reset the dctx_ if it is dirty or null. + void resetDCtx(); + + int level_; + LZ4F_dctx* dctx_{nullptr}; + bool dirty_{false}; +}; + +/* static */ std::unique_ptr LZ4FrameCodec::create( + int level, + CodecType type) { + return make_unique(level, type); +} + +static size_t lz4FrameThrowOnError(size_t code) { + if (LZ4F_isError(code)) { + throw std::runtime_error( + to("LZ4Frame error: ", LZ4F_getErrorName(code))); + } + return code; +} + +void LZ4FrameCodec::resetDCtx() { + if (dctx_ && !dirty_) { + return; + } + if (dctx_) { + LZ4F_freeDecompressionContext(dctx_); + } + lz4FrameThrowOnError(LZ4F_createDecompressionContext(&dctx_, 100)); + dirty_ = false; +} + +LZ4FrameCodec::LZ4FrameCodec(int level, CodecType type) : Codec(type) { + DCHECK(type == CodecType::LZ4_FRAME); + switch (level) { + case COMPRESSION_LEVEL_FASTEST: + case COMPRESSION_LEVEL_DEFAULT: + level_ = 0; + break; + case COMPRESSION_LEVEL_BEST: + level_ = 16; + break; + default: + level_ = level; + break; + } +} + +LZ4FrameCodec::~LZ4FrameCodec() { + if (dctx_) { + LZ4F_freeDecompressionContext(dctx_); + } +} + +std::unique_ptr LZ4FrameCodec::doCompress(const IOBuf* data) { + // LZ4 Frame compression doesn't support streaming so we have to coalesce + IOBuf clone; + if (data->isChained()) { + clone = data->cloneCoalescedAsValue(); + data = &clone; + } + // Set preferences + const auto uncompressedLength = data->length(); + LZ4F_preferences_t prefs{}; + prefs.compressionLevel = level_; + prefs.frameInfo.contentSize = uncompressedLength; + // Compress + auto buf = IOBuf::create(LZ4F_compressFrameBound(uncompressedLength, &prefs)); + const size_t written = lz4FrameThrowOnError(LZ4F_compressFrame( + buf->writableTail(), + buf->tailroom(), + data->data(), + data->length(), + &prefs)); + buf->append(written); + return buf; +} + +std::unique_ptr LZ4FrameCodec::doUncompress( + const IOBuf* data, + uint64_t uncompressedLength) { + // Reset the dctx if any errors have occurred + resetDCtx(); + // Coalesce the data + ByteRange in = *data->begin(); + IOBuf clone; + if (data->isChained()) { + clone = data->cloneCoalescedAsValue(); + in = clone.coalesce(); + } + data = nullptr; + // Select decompression options + LZ4F_decompressOptions_t options; + options.stableDst = 1; + // Select blockSize and growthSize for the IOBufQueue + IOBufQueue queue(IOBufQueue::cacheChainLength()); + auto blockSize = uint64_t{64} << 10; + auto growthSize = uint64_t{4} << 20; + if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH) { + // Allocate uncompressedLength in one chunk (up to 64 MB) + const auto allocateSize = std::min(uncompressedLength, uint64_t{64} << 20); + queue.preallocate(allocateSize, allocateSize); + blockSize = std::min(uncompressedLength, blockSize); + growthSize = std::min(uncompressedLength, growthSize); + } else { + // Reduce growthSize for small data + const auto guessUncompressedLen = 4 * std::max(blockSize, in.size()); + growthSize = std::min(guessUncompressedLen, growthSize); + } + // Once LZ4_decompress() is called, the dctx_ cannot be reused until it + // returns 0 + dirty_ = true; + // Decompress until the frame is over + size_t code = 0; + do { + // Allocate enough space to decompress at least a block + void* out; + size_t outSize; + std::tie(out, outSize) = queue.preallocate(blockSize, growthSize); + // Decompress + size_t inSize = in.size(); + code = lz4FrameThrowOnError( + LZ4F_decompress(dctx_, out, &outSize, in.data(), &inSize, &options)); + if (in.empty() && outSize == 0 && code != 0) { + // We passed no input, no output was produced, and the frame isn't over + // No more forward progress is possible + throw std::runtime_error("LZ4Frame error: Incomplete frame"); + } + in.uncheckedAdvance(inSize); + queue.postallocate(outSize); + } while (code != 0); + // At this point the decompression context can be reused + dirty_ = false; + if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH && + queue.chainLength() != uncompressedLength) { + throw std::runtime_error("LZ4Frame error: Invalid uncompressedLength"); + } + return queue.move(); +} + +#endif // FOLLY_HAVE_LIBLZ4 #if FOLLY_HAVE_LIBSNAPPY @@ -1238,6 +1392,12 @@ static constexpr CodecFactory #else nullptr, #endif + +#if FOLLY_HAVE_LIBLZ4 + LZ4FrameCodec::create, +#else + nullptr, +#endif }; bool hasCodec(CodecType type) { diff --git a/folly/io/Compression.h b/folly/io/Compression.h index 05ce06d1..6d6ae3a4 100644 --- a/folly/io/Compression.h +++ b/folly/io/Compression.h @@ -85,7 +85,13 @@ enum class CodecType { */ GZIP = 9, - NUM_CODEC_TYPES = 10, + /** + * Use LZ4 frame compression. + * Levels supported: 0 = fast, 16 = best; default = 0 + */ + LZ4_FRAME = 10, + + NUM_CODEC_TYPES = 11, }; class Codec { diff --git a/folly/io/test/CompressionTest.cpp b/folly/io/test/CompressionTest.cpp index e468287f..b1599f8d 100644 --- a/folly/io/test/CompressionTest.cpp +++ b/folly/io/test/CompressionTest.cpp @@ -159,6 +159,7 @@ TEST(CompressionTestNeedsUncompressedLength, Simple) { { CodecType::LZMA2_VARINT_SIZE, false }, { CodecType::ZSTD, false }, { CodecType::GZIP, false }, + { CodecType::LZ4_FRAME, false }, }; for (auto const& test : expectations) { @@ -391,8 +392,8 @@ INSTANTIATE_TEST_CASE_P( supportedCodecs({ CodecType::SNAPPY, CodecType::ZLIB, - }))); - + CodecType::LZ4_FRAME, + }))); }}} // namespaces int main(int argc, char *argv[]) { -- 2.34.1