From d216a9bd40ed48bf9f6060895287624b90fc677c Mon Sep 17 00:00:00 2001 From: Nick Terrell Date: Wed, 12 Apr 2017 19:43:02 -0700 Subject: [PATCH] Add bzip2 support Summary: Adds bzip2 support to `folly/io/Compression.h`. Adds bzip2 to the default set of supported codecs for the `AutomaticCodec`. Reviewed By: yfeldblum Differential Revision: D4873771 fbshipit-source-id: d4f4861aef7e4b9efb67095e8892c265b5ae5557 --- folly/configure.ac | 1 + folly/io/Compression.cpp | 235 +++++++++++++++++++++++++++++- folly/io/Compression.h | 10 +- folly/io/test/CompressionTest.cpp | 5 +- 4 files changed, 240 insertions(+), 11 deletions(-) diff --git a/folly/configure.ac b/folly/configure.ac index db8ff1b9..22e2f9fa 100644 --- a/folly/configure.ac +++ b/folly/configure.ac @@ -554,6 +554,7 @@ AC_CHECK_HEADER([snappy.h], AC_CHECK_LIB([snappy], [main])) AC_CHECK_HEADER([zlib.h], AC_CHECK_LIB([z], [main])) AC_CHECK_HEADER([lzma.h], AC_CHECK_LIB([lzma], [main])) AC_CHECK_HEADER([zstd.h], AC_CHECK_LIB([zstd], [ZSTD_compressStream])) +AC_CHECK_HEADER([bzlib.h], AC_CHECK_LIB([bz2], [main])) AC_CHECK_HEADER([linux/membarrier.h], AC_DEFINE([HAVE_LINUX_MEMBARRIER_H], [1], [Define to 1 if membarrier.h is available])) AC_ARG_ENABLE([follytestmain], diff --git a/folly/io/Compression.cpp b/folly/io/Compression.cpp index 4a9deaaf..3bcae861 100644 --- a/folly/io/Compression.cpp +++ b/folly/io/Compression.cpp @@ -43,6 +43,10 @@ #include #endif +#if FOLLY_HAVE_LIBBZ2 +#include +#endif + #include #include #include @@ -285,6 +289,14 @@ prefixToStringLE(T prefix, uint64_t n = sizeof(T)) { memcpy(&result[0], &prefix, n); return result; } + +static uint64_t computeBufferLength( + uint64_t const compressedLength, + uint64_t const blockSize) { + uint64_t constexpr kMaxBufferLength = uint64_t(4) << 20; // 4 MiB + uint64_t const goodBufferSize = 4 * std::max(blockSize, compressedLength); + return std::min(goodBufferSize, kMaxBufferLength); +} } // namespace #if FOLLY_HAVE_LIBLZ4 @@ -969,13 +981,6 @@ std::unique_ptr ZlibCodec::doCompress(const IOBuf* data) { return out; } -static uint64_t computeBufferLength(uint64_t const compressedLength) { - constexpr uint64_t kMaxBufferLength = uint64_t(4) << 20; // 4 MiB - constexpr uint64_t kBlockSize = uint64_t(32) << 10; // 32 KiB - const uint64_t goodBufferSize = 4 * std::max(kBlockSize, compressedLength); - return std::min(goodBufferSize, kMaxBufferLength); -} - std::unique_ptr ZlibCodec::doUncompress(const IOBuf* data, uint64_t uncompressedLength) { z_stream stream; @@ -1009,8 +1014,9 @@ std::unique_ptr ZlibCodec::doUncompress(const IOBuf* data, // Max 64MiB in one go constexpr uint64_t maxSingleStepLength = uint64_t(64) << 20; // 64MiB + constexpr uint64_t kBlockSize = uint64_t(32) << 10; // 32 KiB const uint64_t defaultBufferLength = - computeBufferLength(data->computeChainDataLength()); + computeBufferLength(data->computeChainDataLength(), kBlockSize); auto out = addOutputBuffer( &stream, @@ -1551,6 +1557,212 @@ std::unique_ptr ZSTDCodec::doUncompress( #endif // FOLLY_HAVE_LIBZSTD +#if FOLLY_HAVE_LIBBZ2 + +class Bzip2Codec final : public Codec { + public: + static std::unique_ptr create(int level, CodecType type); + explicit Bzip2Codec(int level, CodecType type); + + std::vector validPrefixes() const override; + bool canUncompress(IOBuf const* data, uint64_t uncompressedLength) + const override; + + private: + std::unique_ptr doCompress(IOBuf const* data) override; + std::unique_ptr doUncompress( + IOBuf const* data, + uint64_t uncompressedLength) override; + + int level_; +}; + +/* static */ std::unique_ptr Bzip2Codec::create( + int level, + CodecType type) { + return make_unique(level, type); +} + +Bzip2Codec::Bzip2Codec(int level, CodecType type) : Codec(type) { + DCHECK(type == CodecType::BZIP2); + switch (level) { + case COMPRESSION_LEVEL_FASTEST: + level = 1; + break; + case COMPRESSION_LEVEL_DEFAULT: + level = 9; + break; + case COMPRESSION_LEVEL_BEST: + level = 9; + break; + } + if (level < 1 || level > 9) { + throw std::invalid_argument( + to("Bzip2: invalid level: ", level)); + } + level_ = level; +} + +static uint32_t constexpr kBzip2MagicLE = 0x685a42; +static uint64_t constexpr kBzip2MagicBytes = 3; + +std::vector Bzip2Codec::validPrefixes() const { + return {prefixToStringLE(kBzip2MagicLE, kBzip2MagicBytes)}; +} + +bool Bzip2Codec::canUncompress(IOBuf const* data, uint64_t) const { + return dataStartsWithLE(data, kBzip2MagicLE, kBzip2MagicBytes); +} + +static bz_stream createBzStream() { + bz_stream stream; + stream.bzalloc = nullptr; + stream.bzfree = nullptr; + stream.opaque = nullptr; + stream.next_in = stream.next_out = nullptr; + stream.avail_in = stream.avail_out = 0; + return stream; +} + +// Throws on error condition, otherwise returns the code. +static int bzCheck(int const rc) { + switch (rc) { + case BZ_OK: + case BZ_RUN_OK: + case BZ_FLUSH_OK: + case BZ_FINISH_OK: + case BZ_STREAM_END: + return rc; + default: + throw std::runtime_error(to("Bzip2 error: ", rc)); + } +} + +static uint64_t bzCompressBound(uint64_t const uncompressedLength) { + // http://www.bzip.org/1.0.5/bzip2-manual-1.0.5.html#bzbufftobuffcompress + // To guarantee that the compressed data will fit in its buffer, allocate an + // output buffer of size 1% larger than the uncompressed data, plus six + // hundred extra bytes. + return uncompressedLength + uncompressedLength / 100 + 600; +} + +static std::unique_ptr addOutputBuffer( + bz_stream* stream, + uint64_t const bufferLength) { + DCHECK_LE(bufferLength, std::numeric_limits::max()); + DCHECK_EQ(stream->avail_out, 0); + + auto buf = IOBuf::create(bufferLength); + buf->append(buf->capacity()); + + stream->next_out = reinterpret_cast(buf->writableData()); + stream->avail_out = buf->length(); + + return buf; +} + +std::unique_ptr Bzip2Codec::doCompress(IOBuf const* data) { + bz_stream stream = createBzStream(); + bzCheck(BZ2_bzCompressInit(&stream, level_, 0, 0)); + SCOPE_EXIT { + bzCheck(BZ2_bzCompressEnd(&stream)); + }; + + uint64_t const uncompressedLength = data->computeChainDataLength(); + uint64_t const maxCompressedLength = bzCompressBound(uncompressedLength); + uint64_t constexpr kMaxSingleStepLength = uint64_t(64) << 20; // 64 MiB + uint64_t constexpr kDefaultBufferLength = uint64_t(4) << 20; + + auto out = addOutputBuffer( + &stream, + maxCompressedLength <= kMaxSingleStepLength ? maxCompressedLength + : kDefaultBufferLength); + + for (auto range : *data) { + while (!range.empty()) { + auto const inSize = std::min(range.size(), kMaxSingleStepLength); + stream.next_in = + const_cast(reinterpret_cast(range.data())); + stream.avail_in = inSize; + + if (stream.avail_out == 0) { + out->prependChain(addOutputBuffer(&stream, kDefaultBufferLength)); + } + + bzCheck(BZ2_bzCompress(&stream, BZ_RUN)); + range.uncheckedAdvance(inSize - stream.avail_in); + } + } + do { + if (stream.avail_out == 0) { + out->prependChain(addOutputBuffer(&stream, kDefaultBufferLength)); + } + } while (bzCheck(BZ2_bzCompress(&stream, BZ_FINISH)) != BZ_STREAM_END); + + out->prev()->trimEnd(stream.avail_out); + + return out; +} + +std::unique_ptr Bzip2Codec::doUncompress( + const IOBuf* data, + uint64_t uncompressedLength) { + bz_stream stream = createBzStream(); + bzCheck(BZ2_bzDecompressInit(&stream, 0, 0)); + SCOPE_EXIT { + bzCheck(BZ2_bzDecompressEnd(&stream)); + }; + + uint64_t constexpr kMaxSingleStepLength = uint64_t(64) << 20; // 64 MiB + uint64_t const kBlockSize = uint64_t(100) << 10; // 100 KiB + uint64_t const kDefaultBufferLength = + computeBufferLength(data->computeChainDataLength(), kBlockSize); + + auto out = addOutputBuffer( + &stream, + ((uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH && + uncompressedLength <= kMaxSingleStepLength) + ? uncompressedLength + : kDefaultBufferLength)); + + int rc = BZ_OK; + for (auto range : *data) { + while (!range.empty()) { + auto const inSize = std::min(range.size(), kMaxSingleStepLength); + stream.next_in = + const_cast(reinterpret_cast(range.data())); + stream.avail_in = inSize; + + if (stream.avail_out == 0) { + out->prependChain(addOutputBuffer(&stream, kDefaultBufferLength)); + } + + rc = bzCheck(BZ2_bzDecompress(&stream)); + range.uncheckedAdvance(inSize - stream.avail_in); + } + } + while (rc != BZ_STREAM_END) { + if (stream.avail_out == 0) { + out->prependChain(addOutputBuffer(&stream, kDefaultBufferLength)); + } + + rc = bzCheck(BZ2_bzDecompress(&stream)); + } + + out->prev()->trimEnd(stream.avail_out); + + uint64_t const totalOut = + (uint64_t(stream.total_out_hi32) << 32) + stream.total_out_lo32; + if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH && + uncompressedLength != totalOut) { + throw std::runtime_error("Bzip2 error: Invalid uncompressed length"); + } + + return out; +} + +#endif // FOLLY_HAVE_LIBBZ2 + /** * Automatic decompression */ @@ -1630,6 +1842,7 @@ AutomaticCodec::AutomaticCodec(std::vector> customCodecs) addCodecIfSupported(CodecType::ZLIB); addCodecIfSupported(CodecType::GZIP); addCodecIfSupported(CodecType::LZMA2); + addCodecIfSupported(CodecType::BZIP2); if (kIsDebug) { checkCompatibleCodecs(); } @@ -1767,6 +1980,12 @@ static constexpr CodecFactory #else nullptr, #endif + +#if FOLLY_HAVE_LIBBZ2 + Bzip2Codec::create, +#else + nullptr +#endif }; bool hasCodec(CodecType type) { diff --git a/folly/io/Compression.h b/folly/io/Compression.h index c46c6164..99963c74 100644 --- a/folly/io/Compression.h +++ b/folly/io/Compression.h @@ -93,7 +93,13 @@ enum class CodecType { */ LZ4_FRAME = 10, - NUM_CODEC_TYPES = 11, + /** + * Use bzip2 compression. + * Levels supported: 1 = fast, 9 = best; default = 9 + */ + BZIP2 = 11, + + NUM_CODEC_TYPES = 12, }; class Codec { @@ -230,7 +236,7 @@ std::unique_ptr getCodec(CodecType type, /** * Returns a codec that can uncompress any of the given codec types as well as - * {LZ4_FRAME, ZSTD, ZLIB, GZIP, LZMA2}. Appends each default codec to + * {LZ4_FRAME, ZSTD, ZLIB, GZIP, LZMA2, BZIP2}. Appends each default codec to * customCodecs in order, so long as a codec with the same type() isn't already * present. When uncompress() is called, each codec's canUncompress() is called * in the order that they are given. Appended default codecs are checked last. diff --git a/folly/io/test/CompressionTest.cpp b/folly/io/test/CompressionTest.cpp index 197d50fd..f378ab44 100644 --- a/folly/io/test/CompressionTest.cpp +++ b/folly/io/test/CompressionTest.cpp @@ -161,6 +161,7 @@ TEST(CompressionTestNeedsUncompressedLength, Simple) { { CodecType::ZSTD, false }, { CodecType::GZIP, false }, { CodecType::LZ4_FRAME, false }, + { CodecType::BZIP2, false }, }; for (auto const& test : expectations) { @@ -396,6 +397,7 @@ INSTANTIATE_TEST_CASE_P( CodecType::LZMA2, CodecType::ZSTD, CodecType::LZ4_FRAME, + CodecType::BZIP2, }))); class AutomaticCodecTest : public testing::TestWithParam { @@ -584,7 +586,8 @@ INSTANTIATE_TEST_CASE_P( CodecType::ZSTD, CodecType::ZLIB, CodecType::GZIP, - CodecType::LZMA2)); + CodecType::LZMA2, + CodecType::BZIP2)); TEST(ValidPrefixesTest, CustomCodec) { std::vector> codecs; -- 2.34.1