2 * Copyright 2017 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #include <folly/io/Compression.h>
22 #if LZ4_VERSION_NUMBER >= 10301
27 #include <glog/logging.h>
29 #if FOLLY_HAVE_LIBSNAPPY
31 #include <snappy-sinksource.h>
38 #if FOLLY_HAVE_LIBLZMA
42 #if FOLLY_HAVE_LIBZSTD
46 #include <folly/Conv.h>
47 #include <folly/Memory.h>
48 #include <folly/Portability.h>
49 #include <folly/ScopeGuard.h>
50 #include <folly/Varint.h>
51 #include <folly/io/Cursor.h>
53 namespace folly { namespace io {
55 Codec::Codec(CodecType type) : type_(type) { }
57 // Ensure consistent behavior in the nullptr case
58 std::unique_ptr<IOBuf> Codec::compress(const IOBuf* data) {
59 uint64_t len = data->computeChainDataLength();
61 return IOBuf::create(0);
63 if (len > maxUncompressedLength()) {
64 throw std::runtime_error("Codec: uncompressed length too large");
67 return doCompress(data);
70 std::string Codec::compress(const StringPiece data) {
71 const uint64_t len = data.size();
75 if (len > maxUncompressedLength()) {
76 throw std::runtime_error("Codec: uncompressed length too large");
79 return doCompressString(data);
82 std::unique_ptr<IOBuf> Codec::uncompress(const IOBuf* data,
83 uint64_t uncompressedLength) {
84 if (uncompressedLength == UNKNOWN_UNCOMPRESSED_LENGTH) {
85 if (needsUncompressedLength()) {
86 throw std::invalid_argument("Codec: uncompressed length required");
88 } else if (uncompressedLength > maxUncompressedLength()) {
89 throw std::runtime_error("Codec: uncompressed length too large");
93 if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
94 uncompressedLength != 0) {
95 throw std::runtime_error("Codec: invalid uncompressed length");
97 return IOBuf::create(0);
100 return doUncompress(data, uncompressedLength);
103 std::string Codec::uncompress(
104 const StringPiece data,
105 uint64_t uncompressedLength) {
106 if (uncompressedLength == UNKNOWN_UNCOMPRESSED_LENGTH) {
107 if (needsUncompressedLength()) {
108 throw std::invalid_argument("Codec: uncompressed length required");
110 } else if (uncompressedLength > maxUncompressedLength()) {
111 throw std::runtime_error("Codec: uncompressed length too large");
115 if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
116 uncompressedLength != 0) {
117 throw std::runtime_error("Codec: invalid uncompressed length");
122 return doUncompressString(data, uncompressedLength);
125 bool Codec::needsUncompressedLength() const {
126 return doNeedsUncompressedLength();
129 uint64_t Codec::maxUncompressedLength() const {
130 return doMaxUncompressedLength();
133 bool Codec::doNeedsUncompressedLength() const {
137 uint64_t Codec::doMaxUncompressedLength() const {
138 return UNLIMITED_UNCOMPRESSED_LENGTH;
141 std::string Codec::doCompressString(const StringPiece data) {
142 const IOBuf inputBuffer{IOBuf::WRAP_BUFFER, data};
143 auto outputBuffer = doCompress(&inputBuffer);
145 output.reserve(outputBuffer->computeChainDataLength());
146 for (auto range : *outputBuffer) {
147 output.append(reinterpret_cast<const char*>(range.data()), range.size());
152 std::string Codec::doUncompressString(
153 const StringPiece data,
154 uint64_t uncompressedLength) {
155 const IOBuf inputBuffer{IOBuf::WRAP_BUFFER, data};
156 auto outputBuffer = doUncompress(&inputBuffer, uncompressedLength);
158 output.reserve(outputBuffer->computeChainDataLength());
159 for (auto range : *outputBuffer) {
160 output.append(reinterpret_cast<const char*>(range.data()), range.size());
170 class NoCompressionCodec final : public Codec {
172 static std::unique_ptr<Codec> create(int level, CodecType type);
173 explicit NoCompressionCodec(int level, CodecType type);
176 std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
177 std::unique_ptr<IOBuf> doUncompress(
179 uint64_t uncompressedLength) override;
182 std::unique_ptr<Codec> NoCompressionCodec::create(int level, CodecType type) {
183 return make_unique<NoCompressionCodec>(level, type);
186 NoCompressionCodec::NoCompressionCodec(int level, CodecType type)
188 DCHECK(type == CodecType::NO_COMPRESSION);
190 case COMPRESSION_LEVEL_DEFAULT:
191 case COMPRESSION_LEVEL_FASTEST:
192 case COMPRESSION_LEVEL_BEST:
196 throw std::invalid_argument(to<std::string>(
197 "NoCompressionCodec: invalid level ", level));
201 std::unique_ptr<IOBuf> NoCompressionCodec::doCompress(
203 return data->clone();
206 std::unique_ptr<IOBuf> NoCompressionCodec::doUncompress(
208 uint64_t uncompressedLength) {
209 if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
210 data->computeChainDataLength() != uncompressedLength) {
211 throw std::runtime_error(to<std::string>(
212 "NoCompressionCodec: invalid uncompressed length"));
214 return data->clone();
217 #if (FOLLY_HAVE_LIBLZ4 || FOLLY_HAVE_LIBLZMA)
221 void encodeVarintToIOBuf(uint64_t val, folly::IOBuf* out) {
222 DCHECK_GE(out->tailroom(), kMaxVarintLength64);
223 out->append(encodeVarint(val, out->writableTail()));
226 inline uint64_t decodeVarintFromCursor(folly::io::Cursor& cursor) {
229 for (int shift = 0; shift <= 63; shift += 7) {
230 b = cursor.read<int8_t>();
231 val |= static_cast<uint64_t>(b & 0x7f) << shift;
237 throw std::invalid_argument("Invalid varint value. Too big.");
244 #endif // FOLLY_HAVE_LIBLZ4 || FOLLY_HAVE_LIBLZMA
246 #if FOLLY_HAVE_LIBLZ4
251 class LZ4Codec final : public Codec {
253 static std::unique_ptr<Codec> create(int level, CodecType type);
254 explicit LZ4Codec(int level, CodecType type);
257 bool doNeedsUncompressedLength() const override;
258 uint64_t doMaxUncompressedLength() const override;
260 bool encodeSize() const { return type() == CodecType::LZ4_VARINT_SIZE; }
262 std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
263 std::unique_ptr<IOBuf> doUncompress(
265 uint64_t uncompressedLength) override;
267 bool highCompression_;
270 std::unique_ptr<Codec> LZ4Codec::create(int level, CodecType type) {
271 return make_unique<LZ4Codec>(level, type);
274 LZ4Codec::LZ4Codec(int level, CodecType type) : Codec(type) {
275 DCHECK(type == CodecType::LZ4 || type == CodecType::LZ4_VARINT_SIZE);
278 case COMPRESSION_LEVEL_FASTEST:
279 case COMPRESSION_LEVEL_DEFAULT:
282 case COMPRESSION_LEVEL_BEST:
286 if (level < 1 || level > 2) {
287 throw std::invalid_argument(to<std::string>(
288 "LZ4Codec: invalid level: ", level));
290 highCompression_ = (level > 1);
293 bool LZ4Codec::doNeedsUncompressedLength() const {
294 return !encodeSize();
297 // The value comes from lz4.h in lz4-r117, but older versions of lz4 don't
298 // define LZ4_MAX_INPUT_SIZE (even though the max size is the same), so do it
300 #ifndef LZ4_MAX_INPUT_SIZE
301 # define LZ4_MAX_INPUT_SIZE 0x7E000000
304 uint64_t LZ4Codec::doMaxUncompressedLength() const {
305 return LZ4_MAX_INPUT_SIZE;
308 std::unique_ptr<IOBuf> LZ4Codec::doCompress(const IOBuf* data) {
310 if (data->isChained()) {
311 // LZ4 doesn't support streaming, so we have to coalesce
312 clone = data->cloneCoalescedAsValue();
316 uint32_t extraSize = encodeSize() ? kMaxVarintLength64 : 0;
317 auto out = IOBuf::create(extraSize + LZ4_compressBound(data->length()));
319 encodeVarintToIOBuf(data->length(), out.get());
323 auto input = reinterpret_cast<const char*>(data->data());
324 auto output = reinterpret_cast<char*>(out->writableTail());
325 const auto inputLength = data->length();
326 #if LZ4_VERSION_NUMBER >= 10700
327 if (highCompression_) {
328 n = LZ4_compress_HC(input, output, inputLength, out->tailroom(), 0);
330 n = LZ4_compress_default(input, output, inputLength, out->tailroom());
333 if (highCompression_) {
334 n = LZ4_compressHC(input, output, inputLength);
336 n = LZ4_compress(input, output, inputLength);
341 CHECK_LE(n, out->capacity());
347 std::unique_ptr<IOBuf> LZ4Codec::doUncompress(
349 uint64_t uncompressedLength) {
351 if (data->isChained()) {
352 // LZ4 doesn't support streaming, so we have to coalesce
353 clone = data->cloneCoalescedAsValue();
357 folly::io::Cursor cursor(data);
358 uint64_t actualUncompressedLength;
360 actualUncompressedLength = decodeVarintFromCursor(cursor);
361 if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
362 uncompressedLength != actualUncompressedLength) {
363 throw std::runtime_error("LZ4Codec: invalid uncompressed length");
366 actualUncompressedLength = uncompressedLength;
367 if (actualUncompressedLength == UNKNOWN_UNCOMPRESSED_LENGTH ||
368 actualUncompressedLength > maxUncompressedLength()) {
369 throw std::runtime_error("LZ4Codec: invalid uncompressed length");
373 auto sp = StringPiece{cursor.peekBytes()};
374 auto out = IOBuf::create(actualUncompressedLength);
375 int n = LZ4_decompress_safe(
377 reinterpret_cast<char*>(out->writableTail()),
379 actualUncompressedLength);
381 if (n < 0 || uint64_t(n) != actualUncompressedLength) {
382 throw std::runtime_error(to<std::string>(
383 "LZ4 decompression returned invalid value ", n));
385 out->append(actualUncompressedLength);
389 #if LZ4_VERSION_NUMBER >= 10301
391 class LZ4FrameCodec final : public Codec {
393 static std::unique_ptr<Codec> create(int level, CodecType type);
394 explicit LZ4FrameCodec(int level, CodecType type);
398 std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
399 std::unique_ptr<IOBuf> doUncompress(
401 uint64_t uncompressedLength) override;
403 // Reset the dctx_ if it is dirty or null.
407 LZ4F_decompressionContext_t dctx_{nullptr};
411 /* static */ std::unique_ptr<Codec> LZ4FrameCodec::create(
414 return make_unique<LZ4FrameCodec>(level, type);
417 static size_t lz4FrameThrowOnError(size_t code) {
418 if (LZ4F_isError(code)) {
419 throw std::runtime_error(
420 to<std::string>("LZ4Frame error: ", LZ4F_getErrorName(code)));
425 void LZ4FrameCodec::resetDCtx() {
426 if (dctx_ && !dirty_) {
430 LZ4F_freeDecompressionContext(dctx_);
432 lz4FrameThrowOnError(LZ4F_createDecompressionContext(&dctx_, 100));
436 LZ4FrameCodec::LZ4FrameCodec(int level, CodecType type) : Codec(type) {
437 DCHECK(type == CodecType::LZ4_FRAME);
439 case COMPRESSION_LEVEL_FASTEST:
440 case COMPRESSION_LEVEL_DEFAULT:
443 case COMPRESSION_LEVEL_BEST:
452 LZ4FrameCodec::~LZ4FrameCodec() {
454 LZ4F_freeDecompressionContext(dctx_);
458 std::unique_ptr<IOBuf> LZ4FrameCodec::doCompress(const IOBuf* data) {
459 // LZ4 Frame compression doesn't support streaming so we have to coalesce
461 if (data->isChained()) {
462 clone = data->cloneCoalescedAsValue();
466 const auto uncompressedLength = data->length();
467 LZ4F_preferences_t prefs{};
468 prefs.compressionLevel = level_;
469 prefs.frameInfo.contentSize = uncompressedLength;
471 auto buf = IOBuf::create(LZ4F_compressFrameBound(uncompressedLength, &prefs));
472 const size_t written = lz4FrameThrowOnError(LZ4F_compressFrame(
478 buf->append(written);
482 std::unique_ptr<IOBuf> LZ4FrameCodec::doUncompress(
484 uint64_t uncompressedLength) {
485 // Reset the dctx if any errors have occurred
488 ByteRange in = *data->begin();
490 if (data->isChained()) {
491 clone = data->cloneCoalescedAsValue();
492 in = clone.coalesce();
495 // Select decompression options
496 LZ4F_decompressOptions_t options;
497 options.stableDst = 1;
498 // Select blockSize and growthSize for the IOBufQueue
499 IOBufQueue queue(IOBufQueue::cacheChainLength());
500 auto blockSize = uint64_t{64} << 10;
501 auto growthSize = uint64_t{4} << 20;
502 if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH) {
503 // Allocate uncompressedLength in one chunk (up to 64 MB)
504 const auto allocateSize = std::min(uncompressedLength, uint64_t{64} << 20);
505 queue.preallocate(allocateSize, allocateSize);
506 blockSize = std::min(uncompressedLength, blockSize);
507 growthSize = std::min(uncompressedLength, growthSize);
509 // Reduce growthSize for small data
510 const auto guessUncompressedLen = 4 * std::max(blockSize, in.size());
511 growthSize = std::min(guessUncompressedLen, growthSize);
513 // Once LZ4_decompress() is called, the dctx_ cannot be reused until it
516 // Decompress until the frame is over
519 // Allocate enough space to decompress at least a block
522 std::tie(out, outSize) = queue.preallocate(blockSize, growthSize);
524 size_t inSize = in.size();
525 code = lz4FrameThrowOnError(
526 LZ4F_decompress(dctx_, out, &outSize, in.data(), &inSize, &options));
527 if (in.empty() && outSize == 0 && code != 0) {
528 // We passed no input, no output was produced, and the frame isn't over
529 // No more forward progress is possible
530 throw std::runtime_error("LZ4Frame error: Incomplete frame");
532 in.uncheckedAdvance(inSize);
533 queue.postallocate(outSize);
535 // At this point the decompression context can be reused
537 if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
538 queue.chainLength() != uncompressedLength) {
539 throw std::runtime_error("LZ4Frame error: Invalid uncompressedLength");
544 #endif // LZ4_VERSION_NUMBER >= 10301
545 #endif // FOLLY_HAVE_LIBLZ4
547 #if FOLLY_HAVE_LIBSNAPPY
554 * Implementation of snappy::Source that reads from a IOBuf chain.
556 class IOBufSnappySource final : public snappy::Source {
558 explicit IOBufSnappySource(const IOBuf* data);
559 size_t Available() const override;
560 const char* Peek(size_t* len) override;
561 void Skip(size_t n) override;
567 IOBufSnappySource::IOBufSnappySource(const IOBuf* data)
568 : available_(data->computeChainDataLength()),
572 size_t IOBufSnappySource::Available() const {
576 const char* IOBufSnappySource::Peek(size_t* len) {
577 auto sp = StringPiece{cursor_.peekBytes()};
582 void IOBufSnappySource::Skip(size_t n) {
583 CHECK_LE(n, available_);
588 class SnappyCodec final : public Codec {
590 static std::unique_ptr<Codec> create(int level, CodecType type);
591 explicit SnappyCodec(int level, CodecType type);
594 uint64_t doMaxUncompressedLength() const override;
595 std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
596 std::unique_ptr<IOBuf> doUncompress(
598 uint64_t uncompressedLength) override;
601 std::unique_ptr<Codec> SnappyCodec::create(int level, CodecType type) {
602 return make_unique<SnappyCodec>(level, type);
605 SnappyCodec::SnappyCodec(int level, CodecType type) : Codec(type) {
606 DCHECK(type == CodecType::SNAPPY);
608 case COMPRESSION_LEVEL_FASTEST:
609 case COMPRESSION_LEVEL_DEFAULT:
610 case COMPRESSION_LEVEL_BEST:
614 throw std::invalid_argument(to<std::string>(
615 "SnappyCodec: invalid level: ", level));
619 uint64_t SnappyCodec::doMaxUncompressedLength() const {
620 // snappy.h uses uint32_t for lengths, so there's that.
621 return std::numeric_limits<uint32_t>::max();
624 std::unique_ptr<IOBuf> SnappyCodec::doCompress(const IOBuf* data) {
625 IOBufSnappySource source(data);
627 IOBuf::create(snappy::MaxCompressedLength(source.Available()));
629 snappy::UncheckedByteArraySink sink(reinterpret_cast<char*>(
630 out->writableTail()));
632 size_t n = snappy::Compress(&source, &sink);
634 CHECK_LE(n, out->capacity());
639 std::unique_ptr<IOBuf> SnappyCodec::doUncompress(const IOBuf* data,
640 uint64_t uncompressedLength) {
641 uint32_t actualUncompressedLength = 0;
644 IOBufSnappySource source(data);
645 if (!snappy::GetUncompressedLength(&source, &actualUncompressedLength)) {
646 throw std::runtime_error("snappy::GetUncompressedLength failed");
648 if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
649 uncompressedLength != actualUncompressedLength) {
650 throw std::runtime_error("snappy: invalid uncompressed length");
654 auto out = IOBuf::create(actualUncompressedLength);
657 IOBufSnappySource source(data);
658 if (!snappy::RawUncompress(&source,
659 reinterpret_cast<char*>(out->writableTail()))) {
660 throw std::runtime_error("snappy::RawUncompress failed");
664 out->append(actualUncompressedLength);
668 #endif // FOLLY_HAVE_LIBSNAPPY
674 class ZlibCodec final : public Codec {
676 static std::unique_ptr<Codec> create(int level, CodecType type);
677 explicit ZlibCodec(int level, CodecType type);
680 std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
681 std::unique_ptr<IOBuf> doUncompress(
683 uint64_t uncompressedLength) override;
685 std::unique_ptr<IOBuf> addOutputBuffer(z_stream* stream, uint32_t length);
686 bool doInflate(z_stream* stream, IOBuf* head, uint32_t bufferLength);
691 std::unique_ptr<Codec> ZlibCodec::create(int level, CodecType type) {
692 return make_unique<ZlibCodec>(level, type);
695 ZlibCodec::ZlibCodec(int level, CodecType type) : Codec(type) {
696 DCHECK(type == CodecType::ZLIB || type == CodecType::GZIP);
698 case COMPRESSION_LEVEL_FASTEST:
701 case COMPRESSION_LEVEL_DEFAULT:
702 level = Z_DEFAULT_COMPRESSION;
704 case COMPRESSION_LEVEL_BEST:
708 if (level != Z_DEFAULT_COMPRESSION && (level < 0 || level > 9)) {
709 throw std::invalid_argument(to<std::string>(
710 "ZlibCodec: invalid level: ", level));
715 std::unique_ptr<IOBuf> ZlibCodec::addOutputBuffer(z_stream* stream,
717 CHECK_EQ(stream->avail_out, 0);
719 auto buf = IOBuf::create(length);
722 stream->next_out = buf->writableData();
723 stream->avail_out = buf->length();
728 bool ZlibCodec::doInflate(z_stream* stream,
730 uint32_t bufferLength) {
731 if (stream->avail_out == 0) {
732 head->prependChain(addOutputBuffer(stream, bufferLength));
735 int rc = inflate(stream, Z_NO_FLUSH);
746 throw std::runtime_error(to<std::string>(
747 "ZlibCodec: inflate error: ", rc, ": ", stream->msg));
749 CHECK(false) << rc << ": " << stream->msg;
755 std::unique_ptr<IOBuf> ZlibCodec::doCompress(const IOBuf* data) {
757 stream.zalloc = nullptr;
758 stream.zfree = nullptr;
759 stream.opaque = nullptr;
761 // Using deflateInit2() to support gzip. "The windowBits parameter is the
762 // base two logarithm of the maximum window size (...) The default value is
763 // 15 (...) Add 16 to windowBits to write a simple gzip header and trailer
764 // around the compressed data instead of a zlib wrapper. The gzip header
765 // will have no file name, no extra data, no comment, no modification time
766 // (set to zero), no header crc, and the operating system will be set to 255
768 int windowBits = 15 + (type() == CodecType::GZIP ? 16 : 0);
769 // All other parameters (method, memLevel, strategy) get default values from
771 int rc = deflateInit2(&stream,
778 throw std::runtime_error(to<std::string>(
779 "ZlibCodec: deflateInit error: ", rc, ": ", stream.msg));
782 stream.next_in = stream.next_out = nullptr;
783 stream.avail_in = stream.avail_out = 0;
784 stream.total_in = stream.total_out = 0;
786 bool success = false;
789 rc = deflateEnd(&stream);
790 // If we're here because of an exception, it's okay if some data
792 CHECK(rc == Z_OK || (!success && rc == Z_DATA_ERROR))
793 << rc << ": " << stream.msg;
796 uint64_t uncompressedLength = data->computeChainDataLength();
797 uint64_t maxCompressedLength = deflateBound(&stream, uncompressedLength);
799 // Max 64MiB in one go
800 constexpr uint32_t maxSingleStepLength = uint32_t(64) << 20; // 64MiB
801 constexpr uint32_t defaultBufferLength = uint32_t(4) << 20; // 4MiB
803 auto out = addOutputBuffer(
805 (maxCompressedLength <= maxSingleStepLength ?
806 maxCompressedLength :
807 defaultBufferLength));
809 for (auto& range : *data) {
810 uint64_t remaining = range.size();
811 uint64_t written = 0;
813 uint32_t step = (remaining > maxSingleStepLength ?
814 maxSingleStepLength : remaining);
815 stream.next_in = const_cast<uint8_t*>(range.data() + written);
816 stream.avail_in = step;
820 while (stream.avail_in != 0) {
821 if (stream.avail_out == 0) {
822 out->prependChain(addOutputBuffer(&stream, defaultBufferLength));
825 rc = deflate(&stream, Z_NO_FLUSH);
827 CHECK_EQ(rc, Z_OK) << stream.msg;
833 if (stream.avail_out == 0) {
834 out->prependChain(addOutputBuffer(&stream, defaultBufferLength));
837 rc = deflate(&stream, Z_FINISH);
838 } while (rc == Z_OK);
840 CHECK_EQ(rc, Z_STREAM_END) << stream.msg;
842 out->prev()->trimEnd(stream.avail_out);
844 success = true; // we survived
849 std::unique_ptr<IOBuf> ZlibCodec::doUncompress(const IOBuf* data,
850 uint64_t uncompressedLength) {
852 stream.zalloc = nullptr;
853 stream.zfree = nullptr;
854 stream.opaque = nullptr;
856 // "The windowBits parameter is the base two logarithm of the maximum window
857 // size (...) The default value is 15 (...) add 16 to decode only the gzip
858 // format (the zlib format will return a Z_DATA_ERROR)."
859 int windowBits = 15 + (type() == CodecType::GZIP ? 16 : 0);
860 int rc = inflateInit2(&stream, windowBits);
862 throw std::runtime_error(to<std::string>(
863 "ZlibCodec: inflateInit error: ", rc, ": ", stream.msg));
866 stream.next_in = stream.next_out = nullptr;
867 stream.avail_in = stream.avail_out = 0;
868 stream.total_in = stream.total_out = 0;
870 bool success = false;
873 rc = inflateEnd(&stream);
874 // If we're here because of an exception, it's okay if some data
876 CHECK(rc == Z_OK || (!success && rc == Z_DATA_ERROR))
877 << rc << ": " << stream.msg;
880 // Max 64MiB in one go
881 constexpr uint32_t maxSingleStepLength = uint32_t(64) << 20; // 64MiB
882 constexpr uint32_t defaultBufferLength = uint32_t(4) << 20; // 4MiB
884 auto out = addOutputBuffer(
886 ((uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
887 uncompressedLength <= maxSingleStepLength) ?
889 defaultBufferLength));
891 bool streamEnd = false;
892 for (auto& range : *data) {
897 stream.next_in = const_cast<uint8_t*>(range.data());
898 stream.avail_in = range.size();
900 while (stream.avail_in != 0) {
902 throw std::runtime_error(to<std::string>(
903 "ZlibCodec: junk after end of data"));
906 streamEnd = doInflate(&stream, out.get(), defaultBufferLength);
911 streamEnd = doInflate(&stream, out.get(), defaultBufferLength);
914 out->prev()->trimEnd(stream.avail_out);
916 if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
917 uncompressedLength != stream.total_out) {
918 throw std::runtime_error(to<std::string>(
919 "ZlibCodec: invalid uncompressed length"));
922 success = true; // we survived
927 #endif // FOLLY_HAVE_LIBZ
929 #if FOLLY_HAVE_LIBLZMA
934 class LZMA2Codec final : public Codec {
936 static std::unique_ptr<Codec> create(int level, CodecType type);
937 explicit LZMA2Codec(int level, CodecType type);
940 bool doNeedsUncompressedLength() const override;
941 uint64_t doMaxUncompressedLength() const override;
943 bool encodeSize() const { return type() == CodecType::LZMA2_VARINT_SIZE; }
945 std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
946 std::unique_ptr<IOBuf> doUncompress(
948 uint64_t uncompressedLength) override;
950 std::unique_ptr<IOBuf> addOutputBuffer(lzma_stream* stream, size_t length);
951 bool doInflate(lzma_stream* stream, IOBuf* head, size_t bufferLength);
956 std::unique_ptr<Codec> LZMA2Codec::create(int level, CodecType type) {
957 return make_unique<LZMA2Codec>(level, type);
960 LZMA2Codec::LZMA2Codec(int level, CodecType type) : Codec(type) {
961 DCHECK(type == CodecType::LZMA2 || type == CodecType::LZMA2_VARINT_SIZE);
963 case COMPRESSION_LEVEL_FASTEST:
966 case COMPRESSION_LEVEL_DEFAULT:
967 level = LZMA_PRESET_DEFAULT;
969 case COMPRESSION_LEVEL_BEST:
973 if (level < 0 || level > 9) {
974 throw std::invalid_argument(to<std::string>(
975 "LZMA2Codec: invalid level: ", level));
980 bool LZMA2Codec::doNeedsUncompressedLength() const {
984 uint64_t LZMA2Codec::doMaxUncompressedLength() const {
985 // From lzma/base.h: "Stream is roughly 8 EiB (2^63 bytes)"
986 return uint64_t(1) << 63;
989 std::unique_ptr<IOBuf> LZMA2Codec::addOutputBuffer(
993 CHECK_EQ(stream->avail_out, 0);
995 auto buf = IOBuf::create(length);
998 stream->next_out = buf->writableData();
999 stream->avail_out = buf->length();
1004 std::unique_ptr<IOBuf> LZMA2Codec::doCompress(const IOBuf* data) {
1006 lzma_stream stream = LZMA_STREAM_INIT;
1008 rc = lzma_easy_encoder(&stream, level_, LZMA_CHECK_NONE);
1009 if (rc != LZMA_OK) {
1010 throw std::runtime_error(folly::to<std::string>(
1011 "LZMA2Codec: lzma_easy_encoder error: ", rc));
1014 SCOPE_EXIT { lzma_end(&stream); };
1016 uint64_t uncompressedLength = data->computeChainDataLength();
1017 uint64_t maxCompressedLength = lzma_stream_buffer_bound(uncompressedLength);
1019 // Max 64MiB in one go
1020 constexpr uint32_t maxSingleStepLength = uint32_t(64) << 20; // 64MiB
1021 constexpr uint32_t defaultBufferLength = uint32_t(4) << 20; // 4MiB
1023 auto out = addOutputBuffer(
1025 (maxCompressedLength <= maxSingleStepLength ?
1026 maxCompressedLength :
1027 defaultBufferLength));
1030 auto size = IOBuf::createCombined(kMaxVarintLength64);
1031 encodeVarintToIOBuf(uncompressedLength, size.get());
1032 size->appendChain(std::move(out));
1033 out = std::move(size);
1036 for (auto& range : *data) {
1037 if (range.empty()) {
1041 stream.next_in = const_cast<uint8_t*>(range.data());
1042 stream.avail_in = range.size();
1044 while (stream.avail_in != 0) {
1045 if (stream.avail_out == 0) {
1046 out->prependChain(addOutputBuffer(&stream, defaultBufferLength));
1049 rc = lzma_code(&stream, LZMA_RUN);
1051 if (rc != LZMA_OK) {
1052 throw std::runtime_error(folly::to<std::string>(
1053 "LZMA2Codec: lzma_code error: ", rc));
1059 if (stream.avail_out == 0) {
1060 out->prependChain(addOutputBuffer(&stream, defaultBufferLength));
1063 rc = lzma_code(&stream, LZMA_FINISH);
1064 } while (rc == LZMA_OK);
1066 if (rc != LZMA_STREAM_END) {
1067 throw std::runtime_error(folly::to<std::string>(
1068 "LZMA2Codec: lzma_code ended with error: ", rc));
1071 out->prev()->trimEnd(stream.avail_out);
1076 bool LZMA2Codec::doInflate(lzma_stream* stream,
1078 size_t bufferLength) {
1079 if (stream->avail_out == 0) {
1080 head->prependChain(addOutputBuffer(stream, bufferLength));
1083 lzma_ret rc = lzma_code(stream, LZMA_RUN);
1088 case LZMA_STREAM_END:
1091 throw std::runtime_error(to<std::string>(
1092 "LZMA2Codec: lzma_code error: ", rc));
1098 std::unique_ptr<IOBuf> LZMA2Codec::doUncompress(const IOBuf* data,
1099 uint64_t uncompressedLength) {
1101 lzma_stream stream = LZMA_STREAM_INIT;
1103 rc = lzma_auto_decoder(&stream, std::numeric_limits<uint64_t>::max(), 0);
1104 if (rc != LZMA_OK) {
1105 throw std::runtime_error(folly::to<std::string>(
1106 "LZMA2Codec: lzma_auto_decoder error: ", rc));
1109 SCOPE_EXIT { lzma_end(&stream); };
1111 // Max 64MiB in one go
1112 constexpr uint32_t maxSingleStepLength = uint32_t(64) << 20; // 64MiB
1113 constexpr uint32_t defaultBufferLength = uint32_t(256) << 10; // 256 KiB
1115 folly::io::Cursor cursor(data);
1117 const uint64_t actualUncompressedLength = decodeVarintFromCursor(cursor);
1118 if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
1119 uncompressedLength != actualUncompressedLength) {
1120 throw std::runtime_error("LZMA2Codec: invalid uncompressed length");
1122 uncompressedLength = actualUncompressedLength;
1125 auto out = addOutputBuffer(
1127 ((uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
1128 uncompressedLength <= maxSingleStepLength)
1129 ? uncompressedLength
1130 : defaultBufferLength));
1132 bool streamEnd = false;
1133 auto buf = cursor.peekBytes();
1134 while (!buf.empty()) {
1135 stream.next_in = const_cast<uint8_t*>(buf.data());
1136 stream.avail_in = buf.size();
1138 while (stream.avail_in != 0) {
1140 throw std::runtime_error(to<std::string>(
1141 "LZMA2Codec: junk after end of data"));
1144 streamEnd = doInflate(&stream, out.get(), defaultBufferLength);
1147 cursor.skip(buf.size());
1148 buf = cursor.peekBytes();
1151 while (!streamEnd) {
1152 streamEnd = doInflate(&stream, out.get(), defaultBufferLength);
1155 out->prev()->trimEnd(stream.avail_out);
1157 if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
1158 uncompressedLength != stream.total_out) {
1159 throw std::runtime_error(
1160 to<std::string>("LZMA2Codec: invalid uncompressed length"));
1166 #endif // FOLLY_HAVE_LIBLZMA
1168 #ifdef FOLLY_HAVE_LIBZSTD
1173 class ZSTDCodec final : public Codec {
1175 static std::unique_ptr<Codec> create(int level, CodecType);
1176 explicit ZSTDCodec(int level, CodecType type);
1179 bool doNeedsUncompressedLength() const override;
1180 std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
1181 std::unique_ptr<IOBuf> doUncompress(
1183 uint64_t uncompressedLength) override;
1188 std::unique_ptr<Codec> ZSTDCodec::create(int level, CodecType type) {
1189 return make_unique<ZSTDCodec>(level, type);
1192 ZSTDCodec::ZSTDCodec(int level, CodecType type) : Codec(type) {
1193 DCHECK(type == CodecType::ZSTD);
1195 case COMPRESSION_LEVEL_FASTEST:
1198 case COMPRESSION_LEVEL_DEFAULT:
1201 case COMPRESSION_LEVEL_BEST:
1205 if (level < 1 || level > ZSTD_maxCLevel()) {
1206 throw std::invalid_argument(
1207 to<std::string>("ZSTD: invalid level: ", level));
1212 bool ZSTDCodec::doNeedsUncompressedLength() const {
1216 void zstdThrowIfError(size_t rc) {
1217 if (!ZSTD_isError(rc)) {
1220 throw std::runtime_error(
1221 to<std::string>("ZSTD returned an error: ", ZSTD_getErrorName(rc)));
1224 std::unique_ptr<IOBuf> ZSTDCodec::doCompress(const IOBuf* data) {
1225 // Support earlier versions of the codec (working with a single IOBuf,
1226 // and using ZSTD_decompress which requires ZSTD frame to contain size,
1227 // which isn't populated by streaming API).
1228 if (!data->isChained()) {
1229 auto out = IOBuf::createCombined(ZSTD_compressBound(data->length()));
1230 const auto rc = ZSTD_compress(
1231 out->writableData(),
1236 zstdThrowIfError(rc);
1241 auto zcs = ZSTD_createCStream();
1243 ZSTD_freeCStream(zcs);
1246 auto rc = ZSTD_initCStream(zcs, level_);
1247 zstdThrowIfError(rc);
1249 Cursor cursor(data);
1250 auto result = IOBuf::createCombined(ZSTD_compressBound(cursor.totalLength()));
1253 out.dst = result->writableTail();
1254 out.size = result->capacity();
1257 for (auto buffer = cursor.peekBytes(); !buffer.empty();) {
1259 in.src = buffer.data();
1260 in.size = buffer.size();
1261 for (in.pos = 0; in.pos != in.size;) {
1262 rc = ZSTD_compressStream(zcs, &out, &in);
1263 zstdThrowIfError(rc);
1265 cursor.skip(in.size);
1266 buffer = cursor.peekBytes();
1269 rc = ZSTD_endStream(zcs, &out);
1270 zstdThrowIfError(rc);
1273 result->append(out.pos);
1277 static std::unique_ptr<IOBuf> zstdUncompressBuffer(
1279 uint64_t uncompressedLength) {
1280 // Check preconditions
1281 DCHECK(!data->isChained());
1282 DCHECK(uncompressedLength != Codec::UNKNOWN_UNCOMPRESSED_LENGTH);
1284 auto uncompressed = IOBuf::create(uncompressedLength);
1285 const auto decompressedSize = ZSTD_decompress(
1286 uncompressed->writableTail(),
1287 uncompressed->tailroom(),
1290 zstdThrowIfError(decompressedSize);
1291 if (decompressedSize != uncompressedLength) {
1292 throw std::runtime_error("ZSTD: invalid uncompressed length");
1294 uncompressed->append(decompressedSize);
1295 return uncompressed;
1298 static std::unique_ptr<IOBuf> zstdUncompressStream(
1300 uint64_t uncompressedLength) {
1301 auto zds = ZSTD_createDStream();
1303 ZSTD_freeDStream(zds);
1306 auto rc = ZSTD_initDStream(zds);
1307 zstdThrowIfError(rc);
1309 ZSTD_outBuffer out{};
1312 auto outputSize = ZSTD_DStreamOutSize();
1313 if (uncompressedLength != Codec::UNKNOWN_UNCOMPRESSED_LENGTH) {
1314 outputSize = uncompressedLength;
1317 IOBufQueue queue(IOBufQueue::cacheChainLength());
1319 Cursor cursor(data);
1321 if (in.pos == in.size) {
1322 auto buffer = cursor.peekBytes();
1323 in.src = buffer.data();
1324 in.size = buffer.size();
1326 cursor.skip(in.size);
1327 if (rc > 1 && in.size == 0) {
1328 throw std::runtime_error(to<std::string>("ZSTD: incomplete input"));
1331 if (out.pos == out.size) {
1333 queue.postallocate(out.pos);
1335 auto buffer = queue.preallocate(outputSize, outputSize);
1336 out.dst = buffer.first;
1337 out.size = buffer.second;
1339 outputSize = ZSTD_DStreamOutSize();
1341 rc = ZSTD_decompressStream(zds, &out, &in);
1342 zstdThrowIfError(rc);
1348 queue.postallocate(out.pos);
1350 if (in.pos != in.size || !cursor.isAtEnd()) {
1351 throw std::runtime_error("ZSTD: junk after end of data");
1353 if (uncompressedLength != Codec::UNKNOWN_UNCOMPRESSED_LENGTH &&
1354 queue.chainLength() != uncompressedLength) {
1355 throw std::runtime_error("ZSTD: invalid uncompressed length");
1358 return queue.move();
1361 std::unique_ptr<IOBuf> ZSTDCodec::doUncompress(
1363 uint64_t uncompressedLength) {
1365 // Read decompressed size from frame if available in first IOBuf.
1366 const auto decompressedSize =
1367 ZSTD_getDecompressedSize(data->data(), data->length());
1368 if (decompressedSize != 0) {
1369 if (uncompressedLength != Codec::UNKNOWN_UNCOMPRESSED_LENGTH &&
1370 uncompressedLength != decompressedSize) {
1371 throw std::runtime_error("ZSTD: invalid uncompressed length");
1373 uncompressedLength = decompressedSize;
1376 // Faster to decompress using ZSTD_decompress() if we can.
1377 if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH && !data->isChained()) {
1378 return zstdUncompressBuffer(data, uncompressedLength);
1380 // Fall back to slower streaming decompression.
1381 return zstdUncompressStream(data, uncompressedLength);
1384 #endif // FOLLY_HAVE_LIBZSTD
1388 typedef std::unique_ptr<Codec> (*CodecFactory)(int, CodecType);
1389 static constexpr CodecFactory
1390 codecFactories[static_cast<size_t>(CodecType::NUM_CODEC_TYPES)] = {
1391 nullptr, // USER_DEFINED
1392 NoCompressionCodec::create,
1394 #if FOLLY_HAVE_LIBLZ4
1400 #if FOLLY_HAVE_LIBSNAPPY
1401 SnappyCodec::create,
1412 #if FOLLY_HAVE_LIBLZ4
1418 #if FOLLY_HAVE_LIBLZMA
1426 #if FOLLY_HAVE_LIBZSTD
1438 #if (FOLLY_HAVE_LIBLZ4 && LZ4_VERSION_NUMBER >= 10301)
1439 LZ4FrameCodec::create,
1445 bool hasCodec(CodecType type) {
1446 size_t idx = static_cast<size_t>(type);
1447 if (idx >= static_cast<size_t>(CodecType::NUM_CODEC_TYPES)) {
1448 throw std::invalid_argument(
1449 to<std::string>("Compression type ", idx, " invalid"));
1451 return codecFactories[idx] != nullptr;
1454 std::unique_ptr<Codec> getCodec(CodecType type, int level) {
1455 size_t idx = static_cast<size_t>(type);
1456 if (idx >= static_cast<size_t>(CodecType::NUM_CODEC_TYPES)) {
1457 throw std::invalid_argument(
1458 to<std::string>("Compression type ", idx, " invalid"));
1460 auto factory = codecFactories[idx];
1462 throw std::invalid_argument(to<std::string>(
1463 "Compression type ", idx, " not supported"));
1465 auto codec = (*factory)(level, type);
1466 DCHECK_EQ(static_cast<size_t>(codec->type()), idx);