X-Git-Url: http://demsky.eecs.uci.edu/git/?a=blobdiff_plain;f=folly%2Fio%2FCompression.cpp;h=c7c4c8f8dd05d22ce743094a9404db0b69eaddde;hb=db777d6b97ec08fdbf461679acf46a9d8ffab62a;hp=cbfaf68c8eb39b5014a05ed49cc87e8362d5ed47;hpb=edec0c3fd0bb5f709eb8988fed2c782177274197;p=folly.git diff --git a/folly/io/Compression.cpp b/folly/io/Compression.cpp index cbfaf68c..c7c4c8f8 100644 --- a/folly/io/Compression.cpp +++ b/folly/io/Compression.cpp @@ -1,5 +1,5 @@ /* - * Copyright 2014 Facebook, Inc. + * Copyright 2016 Facebook, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -36,6 +36,10 @@ #include #endif +#if FOLLY_HAVE_LIBZSTD +#include +#endif + #include #include #include @@ -101,16 +105,16 @@ namespace { /** * No compression */ -class NoCompressionCodec FOLLY_FINAL : public Codec { +class NoCompressionCodec final : public Codec { public: static std::unique_ptr create(int level, CodecType type); explicit NoCompressionCodec(int level, CodecType type); private: - std::unique_ptr doCompress(const IOBuf* data) FOLLY_OVERRIDE; + std::unique_ptr doCompress(const IOBuf* data) override; std::unique_ptr doUncompress( const IOBuf* data, - uint64_t uncompressedLength) FOLLY_OVERRIDE; + uint64_t uncompressedLength) override; }; std::unique_ptr NoCompressionCodec::create(int level, CodecType type) { @@ -148,6 +152,8 @@ std::unique_ptr NoCompressionCodec::doUncompress( return data->clone(); } +#if (FOLLY_HAVE_LIBLZ4 || FOLLY_HAVE_LIBLZMA) + namespace { void encodeVarintToIOBuf(uint64_t val, folly::IOBuf* out) { @@ -155,37 +161,46 @@ void encodeVarintToIOBuf(uint64_t val, folly::IOBuf* out) { out->append(encodeVarint(val, out->writableTail())); } -uint64_t decodeVarintFromCursor(folly::io::Cursor& cursor) { - // Must have enough room in *this* buffer. - auto p = cursor.peek(); - folly::ByteRange range(p.first, p.second); - uint64_t val = decodeVarint(range); - cursor.skip(range.data() - p.first); +inline uint64_t decodeVarintFromCursor(folly::io::Cursor& cursor) { + uint64_t val = 0; + int8_t b = 0; + for (int shift = 0; shift <= 63; shift += 7) { + b = cursor.read(); + val |= static_cast(b & 0x7f) << shift; + if (b >= 0) { + break; + } + } + if (b < 0) { + throw std::invalid_argument("Invalid varint value. Too big."); + } return val; } } // namespace +#endif // FOLLY_HAVE_LIBLZ4 || FOLLY_HAVE_LIBLZMA + #if FOLLY_HAVE_LIBLZ4 /** * LZ4 compression */ -class LZ4Codec FOLLY_FINAL : public Codec { +class LZ4Codec final : public Codec { public: static std::unique_ptr create(int level, CodecType type); explicit LZ4Codec(int level, CodecType type); private: - bool doNeedsUncompressedLength() const FOLLY_OVERRIDE; - uint64_t doMaxUncompressedLength() const FOLLY_OVERRIDE; + bool doNeedsUncompressedLength() const override; + uint64_t doMaxUncompressedLength() const override; bool encodeSize() const { return type() == CodecType::LZ4_VARINT_SIZE; } - std::unique_ptr doCompress(const IOBuf* data) FOLLY_OVERRIDE; + std::unique_ptr doCompress(const IOBuf* data) override; std::unique_ptr doUncompress( const IOBuf* data, - uint64_t uncompressedLength) FOLLY_OVERRIDE; + uint64_t uncompressedLength) override; bool highCompression_; }; @@ -245,13 +260,18 @@ std::unique_ptr LZ4Codec::doCompress(const IOBuf* data) { int n; if (highCompression_) { - n = LZ4_compressHC(reinterpret_cast(data->data()), - reinterpret_cast(out->writableTail()), - data->length()); + n = LZ4_compress_HC( + reinterpret_cast(data->data()), + reinterpret_cast(out->writableTail()), + data->length(), + out->tailroom(), + 0); } else { - n = LZ4_compress(reinterpret_cast(data->data()), - reinterpret_cast(out->writableTail()), - data->length()); + n = LZ4_compress_default( + reinterpret_cast(data->data()), + reinterpret_cast(out->writableTail()), + data->length(), + out->tailroom()); } CHECK_GE(n, 0); @@ -288,14 +308,15 @@ std::unique_ptr LZ4Codec::doUncompress( } } - auto p = cursor.peek(); + auto sp = StringPiece{cursor.peekBytes()}; auto out = IOBuf::create(actualUncompressedLength); - int n = LZ4_decompress_safe(reinterpret_cast(p.first), - reinterpret_cast(out->writableTail()), - p.second, - actualUncompressedLength); + int n = LZ4_decompress_safe( + sp.data(), + reinterpret_cast(out->writableTail()), + sp.size(), + actualUncompressedLength); - if (n != actualUncompressedLength) { + if (n < 0 || uint64_t(n) != actualUncompressedLength) { throw std::runtime_error(to( "LZ4 decompression returned invalid value ", n)); } @@ -314,12 +335,12 @@ std::unique_ptr LZ4Codec::doUncompress( /** * Implementation of snappy::Source that reads from a IOBuf chain. */ -class IOBufSnappySource FOLLY_FINAL : public snappy::Source { +class IOBufSnappySource final : public snappy::Source { public: explicit IOBufSnappySource(const IOBuf* data); - size_t Available() const FOLLY_OVERRIDE; - const char* Peek(size_t* len) FOLLY_OVERRIDE; - void Skip(size_t n) FOLLY_OVERRIDE; + size_t Available() const override; + const char* Peek(size_t* len) override; + void Skip(size_t n) override; private: size_t available_; io::Cursor cursor_; @@ -335,9 +356,9 @@ size_t IOBufSnappySource::Available() const { } const char* IOBufSnappySource::Peek(size_t* len) { - auto p = cursor_.peek(); - *len = p.second; - return reinterpret_cast(p.first); + auto sp = StringPiece{cursor_.peekBytes()}; + *len = sp.size(); + return sp.data(); } void IOBufSnappySource::Skip(size_t n) { @@ -346,17 +367,17 @@ void IOBufSnappySource::Skip(size_t n) { available_ -= n; } -class SnappyCodec FOLLY_FINAL : public Codec { +class SnappyCodec final : public Codec { public: static std::unique_ptr create(int level, CodecType type); explicit SnappyCodec(int level, CodecType type); private: - uint64_t doMaxUncompressedLength() const FOLLY_OVERRIDE; - std::unique_ptr doCompress(const IOBuf* data) FOLLY_OVERRIDE; + uint64_t doMaxUncompressedLength() const override; + std::unique_ptr doCompress(const IOBuf* data) override; std::unique_ptr doUncompress( const IOBuf* data, - uint64_t uncompressedLength) FOLLY_OVERRIDE; + uint64_t uncompressedLength) override; }; std::unique_ptr SnappyCodec::create(int level, CodecType type) { @@ -432,16 +453,16 @@ std::unique_ptr SnappyCodec::doUncompress(const IOBuf* data, /** * Zlib codec */ -class ZlibCodec FOLLY_FINAL : public Codec { +class ZlibCodec final : public Codec { public: static std::unique_ptr create(int level, CodecType type); explicit ZlibCodec(int level, CodecType type); private: - std::unique_ptr doCompress(const IOBuf* data) FOLLY_OVERRIDE; + std::unique_ptr doCompress(const IOBuf* data) override; std::unique_ptr doUncompress( const IOBuf* data, - uint64_t uncompressedLength) FOLLY_OVERRIDE; + uint64_t uncompressedLength) override; std::unique_ptr addOutputBuffer(z_stream* stream, uint32_t length); bool doInflate(z_stream* stream, IOBuf* head, uint32_t bufferLength); @@ -454,7 +475,7 @@ std::unique_ptr ZlibCodec::create(int level, CodecType type) { } ZlibCodec::ZlibCodec(int level, CodecType type) : Codec(type) { - DCHECK(type == CodecType::ZLIB); + DCHECK(type == CodecType::ZLIB || type == CodecType::GZIP); switch (level) { case COMPRESSION_LEVEL_FASTEST: level = 1; @@ -519,7 +540,22 @@ std::unique_ptr ZlibCodec::doCompress(const IOBuf* data) { stream.zfree = nullptr; stream.opaque = nullptr; - int rc = deflateInit(&stream, level_); + // Using deflateInit2() to support gzip. "The windowBits parameter is the + // base two logarithm of the maximum window size (...) The default value is + // 15 (...) Add 16 to windowBits to write a simple gzip header and trailer + // around the compressed data instead of a zlib wrapper. The gzip header + // will have no file name, no extra data, no comment, no modification time + // (set to zero), no header crc, and the operating system will be set to 255 + // (unknown)." + int windowBits = 15 + (type() == CodecType::GZIP ? 16 : 0); + // All other parameters (method, memLevel, strategy) get default values from + // the zlib manual. + int rc = deflateInit2(&stream, + level_, + Z_DEFLATED, + windowBits, + /* memLevel */ 8, + Z_DEFAULT_STRATEGY); if (rc != Z_OK) { throw std::runtime_error(to( "ZlibCodec: deflateInit error: ", rc, ": ", stream.msg)); @@ -532,7 +568,7 @@ std::unique_ptr ZlibCodec::doCompress(const IOBuf* data) { bool success = false; SCOPE_EXIT { - int rc = deflateEnd(&stream); + rc = deflateEnd(&stream); // If we're here because of an exception, it's okay if some data // got dropped. CHECK(rc == Z_OK || (!success && rc == Z_DATA_ERROR)) @@ -553,21 +589,25 @@ std::unique_ptr ZlibCodec::doCompress(const IOBuf* data) { defaultBufferLength)); for (auto& range : *data) { - if (range.empty()) { - continue; - } - - stream.next_in = const_cast(range.data()); - stream.avail_in = range.size(); - - while (stream.avail_in != 0) { - if (stream.avail_out == 0) { - out->prependChain(addOutputBuffer(&stream, defaultBufferLength)); + uint64_t remaining = range.size(); + uint64_t written = 0; + while (remaining) { + uint32_t step = (remaining > maxSingleStepLength ? + maxSingleStepLength : remaining); + stream.next_in = const_cast(range.data() + written); + stream.avail_in = step; + remaining -= step; + written += step; + + while (stream.avail_in != 0) { + if (stream.avail_out == 0) { + out->prependChain(addOutputBuffer(&stream, defaultBufferLength)); + } + + rc = deflate(&stream, Z_NO_FLUSH); + + CHECK_EQ(rc, Z_OK) << stream.msg; } - - rc = deflate(&stream, Z_NO_FLUSH); - - CHECK_EQ(rc, Z_OK) << stream.msg; } } @@ -595,7 +635,11 @@ std::unique_ptr ZlibCodec::doUncompress(const IOBuf* data, stream.zfree = nullptr; stream.opaque = nullptr; - int rc = inflateInit(&stream); + // "The windowBits parameter is the base two logarithm of the maximum window + // size (...) The default value is 15 (...) add 16 to decode only the gzip + // format (the zlib format will return a Z_DATA_ERROR)." + int windowBits = 15 + (type() == CodecType::GZIP ? 16 : 0); + int rc = inflateInit2(&stream, windowBits); if (rc != Z_OK) { throw std::runtime_error(to( "ZlibCodec: inflateInit error: ", rc, ": ", stream.msg)); @@ -608,7 +652,7 @@ std::unique_ptr ZlibCodec::doUncompress(const IOBuf* data, bool success = false; SCOPE_EXIT { - int rc = inflateEnd(&stream); + rc = inflateEnd(&stream); // If we're here because of an exception, it's okay if some data // got dropped. CHECK(rc == Z_OK || (!success && rc == Z_DATA_ERROR)) @@ -669,21 +713,21 @@ std::unique_ptr ZlibCodec::doUncompress(const IOBuf* data, /** * LZMA2 compression */ -class LZMA2Codec FOLLY_FINAL : public Codec { +class LZMA2Codec final : public Codec { public: static std::unique_ptr create(int level, CodecType type); explicit LZMA2Codec(int level, CodecType type); private: - bool doNeedsUncompressedLength() const FOLLY_OVERRIDE; - uint64_t doMaxUncompressedLength() const FOLLY_OVERRIDE; + bool doNeedsUncompressedLength() const override; + uint64_t doMaxUncompressedLength() const override; bool encodeSize() const { return type() == CodecType::LZMA2_VARINT_SIZE; } - std::unique_ptr doCompress(const IOBuf* data) FOLLY_OVERRIDE; + std::unique_ptr doCompress(const IOBuf* data) override; std::unique_ptr doUncompress( const IOBuf* data, - uint64_t uncompressedLength) FOLLY_OVERRIDE; + uint64_t uncompressedLength) override; std::unique_ptr addOutputBuffer(lzma_stream* stream, size_t length); bool doInflate(lzma_stream* stream, IOBuf* head, size_t bufferLength); @@ -870,10 +914,10 @@ std::unique_ptr LZMA2Codec::doUncompress(const IOBuf* data, defaultBufferLength)); bool streamEnd = false; - auto buf = cursor.peek(); - while (buf.second != 0) { - stream.next_in = const_cast(buf.first); - stream.avail_in = buf.second; + auto buf = cursor.peekBytes(); + while (!buf.empty()) { + stream.next_in = const_cast(buf.data()); + stream.avail_in = buf.size(); while (stream.avail_in != 0) { if (streamEnd) { @@ -884,8 +928,8 @@ std::unique_ptr LZMA2Codec::doUncompress(const IOBuf* data, streamEnd = doInflate(&stream, out.get(), defaultBufferLength); } - cursor.skip(buf.second); - buf = cursor.peek(); + cursor.skip(buf.size()); + buf = cursor.peekBytes(); } while (!streamEnd) { @@ -904,55 +948,247 @@ std::unique_ptr LZMA2Codec::doUncompress(const IOBuf* data, #endif // FOLLY_HAVE_LIBLZMA -typedef std::unique_ptr (*CodecFactory)(int, CodecType); +#ifdef FOLLY_HAVE_LIBZSTD + +/** + * ZSTD compression + */ +class ZSTDCodec final : public Codec { + public: + static std::unique_ptr create(int level, CodecType); + explicit ZSTDCodec(int level, CodecType type); + + private: + bool doNeedsUncompressedLength() const override; + std::unique_ptr doCompress(const IOBuf* data) override; + std::unique_ptr doUncompress( + const IOBuf* data, + uint64_t uncompressedLength) override; + + int level_; +}; + +std::unique_ptr ZSTDCodec::create(int level, CodecType type) { + return make_unique(level, type); +} + +ZSTDCodec::ZSTDCodec(int level, CodecType type) : Codec(type) { + DCHECK(type == CodecType::ZSTD); + switch (level) { + case COMPRESSION_LEVEL_FASTEST: + level = 1; + break; + case COMPRESSION_LEVEL_DEFAULT: + level = 1; + break; + case COMPRESSION_LEVEL_BEST: + level = 19; + break; + } + if (level < 1 || level > ZSTD_maxCLevel()) { + throw std::invalid_argument( + to("ZSTD: invalid level: ", level)); + } + level_ = level; +} + +bool ZSTDCodec::doNeedsUncompressedLength() const { + return false; +} + +void zstdThrowIfError(size_t rc) { + if (!ZSTD_isError(rc)) { + return; + } + throw std::runtime_error( + to("ZSTD returned an error: ", ZSTD_getErrorName(rc))); +} + +std::unique_ptr ZSTDCodec::doCompress(const IOBuf* data) { + // Support earlier versions of the codec (working with a single IOBuf, + // and using ZSTD_decompress which requires ZSTD frame to contain size, + // which isn't populated by streaming API). + if (!data->isChained()) { + auto out = IOBuf::createCombined(ZSTD_compressBound(data->length())); + const auto rc = ZSTD_compress( + out->writableData(), + out->capacity(), + data->data(), + data->length(), + level_); + zstdThrowIfError(rc); + out->append(rc); + return out; + } + + auto zcs = ZSTD_createCStream(); + SCOPE_EXIT { + ZSTD_freeCStream(zcs); + }; + + auto rc = ZSTD_initCStream(zcs, level_); + zstdThrowIfError(rc); + + Cursor cursor(data); + auto result = IOBuf::createCombined(ZSTD_compressBound(cursor.totalLength())); + + ZSTD_outBuffer out; + out.dst = result->writableTail(); + out.size = result->capacity(); + out.pos = 0; + + for (auto buffer = cursor.peekBytes(); !buffer.empty();) { + ZSTD_inBuffer in; + in.src = buffer.data(); + in.size = buffer.size(); + for (in.pos = 0; in.pos != in.size;) { + rc = ZSTD_compressStream(zcs, &out, &in); + zstdThrowIfError(rc); + } + cursor.skip(in.size); + buffer = cursor.peekBytes(); + } + + rc = ZSTD_endStream(zcs, &out); + zstdThrowIfError(rc); + CHECK_EQ(rc, 0); -CodecFactory gCodecFactories[ + result->append(out.pos); + return result; +} + +std::unique_ptr ZSTDCodec::doUncompress( + const IOBuf* data, + uint64_t uncompressedLength) { + auto zds = ZSTD_createDStream(); + SCOPE_EXIT { + ZSTD_freeDStream(zds); + }; + + auto rc = ZSTD_initDStream(zds); + zstdThrowIfError(rc); + + ZSTD_outBuffer out{}; + ZSTD_inBuffer in{}; + + auto outputSize = ZSTD_DStreamOutSize(); + if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH) { + outputSize = uncompressedLength; + } else { + auto decompressedSize = + ZSTD_getDecompressedSize(data->data(), data->length()); + if (decompressedSize != 0 && decompressedSize < outputSize) { + outputSize = decompressedSize; + } + } + + IOBufQueue queue(IOBufQueue::cacheChainLength()); + + Cursor cursor(data); + for (rc = 0;;) { + if (in.pos == in.size) { + auto buffer = cursor.peekBytes(); + in.src = buffer.data(); + in.size = buffer.size(); + in.pos = 0; + cursor.skip(in.size); + if (rc > 1 && in.size == 0) { + throw std::runtime_error(to("ZSTD: incomplete input")); + } + } + if (out.pos == out.size) { + if (out.pos != 0) { + queue.postallocate(out.pos); + } + auto buffer = queue.preallocate(outputSize, outputSize); + out.dst = buffer.first; + out.size = buffer.second; + out.pos = 0; + outputSize = ZSTD_DStreamOutSize(); + } + rc = ZSTD_decompressStream(zds, &out, &in); + zstdThrowIfError(rc); + if (rc == 0) { + break; + } + } + if (out.pos != 0) { + queue.postallocate(out.pos); + } + if (in.pos != in.size || !cursor.isAtEnd()) { + throw std::runtime_error("ZSTD: junk after end of data"); + } + if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH && + queue.chainLength() != uncompressedLength) { + throw std::runtime_error("ZSTD: invalid uncompressed length"); + } + + return queue.move(); +} + +#endif // FOLLY_HAVE_LIBZSTD + +} // namespace + +std::unique_ptr getCodec(CodecType type, int level) { + typedef std::unique_ptr (*CodecFactory)(int, CodecType); + + static CodecFactory codecFactories[ static_cast(CodecType::NUM_CODEC_TYPES)] = { - nullptr, // USER_DEFINED - NoCompressionCodec::create, + nullptr, // USER_DEFINED + NoCompressionCodec::create, #if FOLLY_HAVE_LIBLZ4 - LZ4Codec::create, + LZ4Codec::create, #else - nullptr, + nullptr, #endif #if FOLLY_HAVE_LIBSNAPPY - SnappyCodec::create, + SnappyCodec::create, #else - nullptr, + nullptr, #endif #if FOLLY_HAVE_LIBZ - ZlibCodec::create, + ZlibCodec::create, #else - nullptr, + nullptr, #endif #if FOLLY_HAVE_LIBLZ4 - LZ4Codec::create, + LZ4Codec::create, #else - nullptr, + nullptr, #endif #if FOLLY_HAVE_LIBLZMA - LZMA2Codec::create, - LZMA2Codec::create, + LZMA2Codec::create, + LZMA2Codec::create, #else - nullptr, - nullptr, + nullptr, + nullptr, #endif -}; -} // namespace +#if FOLLY_HAVE_LIBZSTD + ZSTDCodec::create, +#else + nullptr, +#endif + +#if FOLLY_HAVE_LIBZ + ZlibCodec::create, +#else + nullptr, +#endif + }; -std::unique_ptr getCodec(CodecType type, int level) { size_t idx = static_cast(type); if (idx >= static_cast(CodecType::NUM_CODEC_TYPES)) { throw std::invalid_argument(to( "Compression type ", idx, " not supported")); } - auto factory = gCodecFactories[idx]; + auto factory = codecFactories[idx]; if (!factory) { throw std::invalid_argument(to( "Compression type ", idx, " not supported")); @@ -963,4 +1199,3 @@ std::unique_ptr getCodec(CodecType type, int level) { } }} // namespaces -