2 * Copyright 2017 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #include <folly/io/Compression.h>
25 #include <glog/logging.h>
27 #if FOLLY_HAVE_LIBSNAPPY
29 #include <snappy-sinksource.h>
36 #if FOLLY_HAVE_LIBLZMA
40 #if FOLLY_HAVE_LIBZSTD
44 #include <folly/Conv.h>
45 #include <folly/Memory.h>
46 #include <folly/Portability.h>
47 #include <folly/ScopeGuard.h>
48 #include <folly/Varint.h>
49 #include <folly/io/Cursor.h>
51 namespace folly { namespace io {
53 Codec::Codec(CodecType type) : type_(type) { }
55 // Ensure consistent behavior in the nullptr case
56 std::unique_ptr<IOBuf> Codec::compress(const IOBuf* data) {
57 uint64_t len = data->computeChainDataLength();
59 return IOBuf::create(0);
61 if (len > maxUncompressedLength()) {
62 throw std::runtime_error("Codec: uncompressed length too large");
65 return doCompress(data);
68 std::string Codec::compress(const StringPiece data) {
69 const uint64_t len = data.size();
73 if (len > maxUncompressedLength()) {
74 throw std::runtime_error("Codec: uncompressed length too large");
77 return doCompressString(data);
80 std::unique_ptr<IOBuf> Codec::uncompress(const IOBuf* data,
81 uint64_t uncompressedLength) {
82 if (uncompressedLength == UNKNOWN_UNCOMPRESSED_LENGTH) {
83 if (needsUncompressedLength()) {
84 throw std::invalid_argument("Codec: uncompressed length required");
86 } else if (uncompressedLength > maxUncompressedLength()) {
87 throw std::runtime_error("Codec: uncompressed length too large");
91 if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
92 uncompressedLength != 0) {
93 throw std::runtime_error("Codec: invalid uncompressed length");
95 return IOBuf::create(0);
98 return doUncompress(data, uncompressedLength);
101 std::string Codec::uncompress(
102 const StringPiece data,
103 uint64_t uncompressedLength) {
104 if (uncompressedLength == UNKNOWN_UNCOMPRESSED_LENGTH) {
105 if (needsUncompressedLength()) {
106 throw std::invalid_argument("Codec: uncompressed length required");
108 } else if (uncompressedLength > maxUncompressedLength()) {
109 throw std::runtime_error("Codec: uncompressed length too large");
113 if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
114 uncompressedLength != 0) {
115 throw std::runtime_error("Codec: invalid uncompressed length");
120 return doUncompressString(data, uncompressedLength);
123 bool Codec::needsUncompressedLength() const {
124 return doNeedsUncompressedLength();
127 uint64_t Codec::maxUncompressedLength() const {
128 return doMaxUncompressedLength();
131 bool Codec::doNeedsUncompressedLength() const {
135 uint64_t Codec::doMaxUncompressedLength() const {
136 return UNLIMITED_UNCOMPRESSED_LENGTH;
139 std::string Codec::doCompressString(const StringPiece data) {
140 const IOBuf inputBuffer{IOBuf::WRAP_BUFFER, data};
141 auto outputBuffer = doCompress(&inputBuffer);
143 output.reserve(outputBuffer->computeChainDataLength());
144 for (auto range : *outputBuffer) {
145 output.append(reinterpret_cast<const char*>(range.data()), range.size());
150 std::string Codec::doUncompressString(
151 const StringPiece data,
152 uint64_t uncompressedLength) {
153 const IOBuf inputBuffer{IOBuf::WRAP_BUFFER, data};
154 auto outputBuffer = doUncompress(&inputBuffer, uncompressedLength);
156 output.reserve(outputBuffer->computeChainDataLength());
157 for (auto range : *outputBuffer) {
158 output.append(reinterpret_cast<const char*>(range.data()), range.size());
168 class NoCompressionCodec final : public Codec {
170 static std::unique_ptr<Codec> create(int level, CodecType type);
171 explicit NoCompressionCodec(int level, CodecType type);
174 std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
175 std::unique_ptr<IOBuf> doUncompress(
177 uint64_t uncompressedLength) override;
180 std::unique_ptr<Codec> NoCompressionCodec::create(int level, CodecType type) {
181 return make_unique<NoCompressionCodec>(level, type);
184 NoCompressionCodec::NoCompressionCodec(int level, CodecType type)
186 DCHECK(type == CodecType::NO_COMPRESSION);
188 case COMPRESSION_LEVEL_DEFAULT:
189 case COMPRESSION_LEVEL_FASTEST:
190 case COMPRESSION_LEVEL_BEST:
194 throw std::invalid_argument(to<std::string>(
195 "NoCompressionCodec: invalid level ", level));
199 std::unique_ptr<IOBuf> NoCompressionCodec::doCompress(
201 return data->clone();
204 std::unique_ptr<IOBuf> NoCompressionCodec::doUncompress(
206 uint64_t uncompressedLength) {
207 if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
208 data->computeChainDataLength() != uncompressedLength) {
209 throw std::runtime_error(to<std::string>(
210 "NoCompressionCodec: invalid uncompressed length"));
212 return data->clone();
215 #if (FOLLY_HAVE_LIBLZ4 || FOLLY_HAVE_LIBLZMA)
219 void encodeVarintToIOBuf(uint64_t val, folly::IOBuf* out) {
220 DCHECK_GE(out->tailroom(), kMaxVarintLength64);
221 out->append(encodeVarint(val, out->writableTail()));
224 inline uint64_t decodeVarintFromCursor(folly::io::Cursor& cursor) {
227 for (int shift = 0; shift <= 63; shift += 7) {
228 b = cursor.read<int8_t>();
229 val |= static_cast<uint64_t>(b & 0x7f) << shift;
235 throw std::invalid_argument("Invalid varint value. Too big.");
242 #endif // FOLLY_HAVE_LIBLZ4 || FOLLY_HAVE_LIBLZMA
244 #if FOLLY_HAVE_LIBLZ4
249 class LZ4Codec final : public Codec {
251 static std::unique_ptr<Codec> create(int level, CodecType type);
252 explicit LZ4Codec(int level, CodecType type);
255 bool doNeedsUncompressedLength() const override;
256 uint64_t doMaxUncompressedLength() const override;
258 bool encodeSize() const { return type() == CodecType::LZ4_VARINT_SIZE; }
260 std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
261 std::unique_ptr<IOBuf> doUncompress(
263 uint64_t uncompressedLength) override;
265 bool highCompression_;
268 std::unique_ptr<Codec> LZ4Codec::create(int level, CodecType type) {
269 return make_unique<LZ4Codec>(level, type);
272 LZ4Codec::LZ4Codec(int level, CodecType type) : Codec(type) {
273 DCHECK(type == CodecType::LZ4 || type == CodecType::LZ4_VARINT_SIZE);
276 case COMPRESSION_LEVEL_FASTEST:
277 case COMPRESSION_LEVEL_DEFAULT:
280 case COMPRESSION_LEVEL_BEST:
284 if (level < 1 || level > 2) {
285 throw std::invalid_argument(to<std::string>(
286 "LZ4Codec: invalid level: ", level));
288 highCompression_ = (level > 1);
291 bool LZ4Codec::doNeedsUncompressedLength() const {
292 return !encodeSize();
295 // The value comes from lz4.h in lz4-r117, but older versions of lz4 don't
296 // define LZ4_MAX_INPUT_SIZE (even though the max size is the same), so do it
298 #ifndef LZ4_MAX_INPUT_SIZE
299 # define LZ4_MAX_INPUT_SIZE 0x7E000000
302 uint64_t LZ4Codec::doMaxUncompressedLength() const {
303 return LZ4_MAX_INPUT_SIZE;
306 std::unique_ptr<IOBuf> LZ4Codec::doCompress(const IOBuf* data) {
308 if (data->isChained()) {
309 // LZ4 doesn't support streaming, so we have to coalesce
310 clone = data->cloneCoalescedAsValue();
314 uint32_t extraSize = encodeSize() ? kMaxVarintLength64 : 0;
315 auto out = IOBuf::create(extraSize + LZ4_compressBound(data->length()));
317 encodeVarintToIOBuf(data->length(), out.get());
321 auto input = reinterpret_cast<const char*>(data->data());
322 auto output = reinterpret_cast<char*>(out->writableTail());
323 const auto inputLength = data->length();
324 #if LZ4_VERSION_NUMBER >= 10700
325 if (highCompression_) {
326 n = LZ4_compress_HC(input, output, inputLength, out->tailroom(), 0);
328 n = LZ4_compress_default(input, output, inputLength, out->tailroom());
331 if (highCompression_) {
332 n = LZ4_compressHC(input, output, inputLength);
334 n = LZ4_compress(input, output, inputLength);
339 CHECK_LE(n, out->capacity());
345 std::unique_ptr<IOBuf> LZ4Codec::doUncompress(
347 uint64_t uncompressedLength) {
349 if (data->isChained()) {
350 // LZ4 doesn't support streaming, so we have to coalesce
351 clone = data->cloneCoalescedAsValue();
355 folly::io::Cursor cursor(data);
356 uint64_t actualUncompressedLength;
358 actualUncompressedLength = decodeVarintFromCursor(cursor);
359 if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
360 uncompressedLength != actualUncompressedLength) {
361 throw std::runtime_error("LZ4Codec: invalid uncompressed length");
364 actualUncompressedLength = uncompressedLength;
365 if (actualUncompressedLength == UNKNOWN_UNCOMPRESSED_LENGTH ||
366 actualUncompressedLength > maxUncompressedLength()) {
367 throw std::runtime_error("LZ4Codec: invalid uncompressed length");
371 auto sp = StringPiece{cursor.peekBytes()};
372 auto out = IOBuf::create(actualUncompressedLength);
373 int n = LZ4_decompress_safe(
375 reinterpret_cast<char*>(out->writableTail()),
377 actualUncompressedLength);
379 if (n < 0 || uint64_t(n) != actualUncompressedLength) {
380 throw std::runtime_error(to<std::string>(
381 "LZ4 decompression returned invalid value ", n));
383 out->append(actualUncompressedLength);
387 class LZ4FrameCodec final : public Codec {
389 static std::unique_ptr<Codec> create(int level, CodecType type);
390 explicit LZ4FrameCodec(int level, CodecType type);
394 std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
395 std::unique_ptr<IOBuf> doUncompress(
397 uint64_t uncompressedLength) override;
399 // Reset the dctx_ if it is dirty or null.
403 LZ4F_dctx* dctx_{nullptr};
407 /* static */ std::unique_ptr<Codec> LZ4FrameCodec::create(
410 return make_unique<LZ4FrameCodec>(level, type);
413 static size_t lz4FrameThrowOnError(size_t code) {
414 if (LZ4F_isError(code)) {
415 throw std::runtime_error(
416 to<std::string>("LZ4Frame error: ", LZ4F_getErrorName(code)));
421 void LZ4FrameCodec::resetDCtx() {
422 if (dctx_ && !dirty_) {
426 LZ4F_freeDecompressionContext(dctx_);
428 lz4FrameThrowOnError(LZ4F_createDecompressionContext(&dctx_, 100));
432 LZ4FrameCodec::LZ4FrameCodec(int level, CodecType type) : Codec(type) {
433 DCHECK(type == CodecType::LZ4_FRAME);
435 case COMPRESSION_LEVEL_FASTEST:
436 case COMPRESSION_LEVEL_DEFAULT:
439 case COMPRESSION_LEVEL_BEST:
448 LZ4FrameCodec::~LZ4FrameCodec() {
450 LZ4F_freeDecompressionContext(dctx_);
454 std::unique_ptr<IOBuf> LZ4FrameCodec::doCompress(const IOBuf* data) {
455 // LZ4 Frame compression doesn't support streaming so we have to coalesce
457 if (data->isChained()) {
458 clone = data->cloneCoalescedAsValue();
462 const auto uncompressedLength = data->length();
463 LZ4F_preferences_t prefs{};
464 prefs.compressionLevel = level_;
465 prefs.frameInfo.contentSize = uncompressedLength;
467 auto buf = IOBuf::create(LZ4F_compressFrameBound(uncompressedLength, &prefs));
468 const size_t written = lz4FrameThrowOnError(LZ4F_compressFrame(
474 buf->append(written);
478 std::unique_ptr<IOBuf> LZ4FrameCodec::doUncompress(
480 uint64_t uncompressedLength) {
481 // Reset the dctx if any errors have occurred
484 ByteRange in = *data->begin();
486 if (data->isChained()) {
487 clone = data->cloneCoalescedAsValue();
488 in = clone.coalesce();
491 // Select decompression options
492 LZ4F_decompressOptions_t options;
493 options.stableDst = 1;
494 // Select blockSize and growthSize for the IOBufQueue
495 IOBufQueue queue(IOBufQueue::cacheChainLength());
496 auto blockSize = uint64_t{64} << 10;
497 auto growthSize = uint64_t{4} << 20;
498 if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH) {
499 // Allocate uncompressedLength in one chunk (up to 64 MB)
500 const auto allocateSize = std::min(uncompressedLength, uint64_t{64} << 20);
501 queue.preallocate(allocateSize, allocateSize);
502 blockSize = std::min(uncompressedLength, blockSize);
503 growthSize = std::min(uncompressedLength, growthSize);
505 // Reduce growthSize for small data
506 const auto guessUncompressedLen = 4 * std::max(blockSize, in.size());
507 growthSize = std::min(guessUncompressedLen, growthSize);
509 // Once LZ4_decompress() is called, the dctx_ cannot be reused until it
512 // Decompress until the frame is over
515 // Allocate enough space to decompress at least a block
518 std::tie(out, outSize) = queue.preallocate(blockSize, growthSize);
520 size_t inSize = in.size();
521 code = lz4FrameThrowOnError(
522 LZ4F_decompress(dctx_, out, &outSize, in.data(), &inSize, &options));
523 if (in.empty() && outSize == 0 && code != 0) {
524 // We passed no input, no output was produced, and the frame isn't over
525 // No more forward progress is possible
526 throw std::runtime_error("LZ4Frame error: Incomplete frame");
528 in.uncheckedAdvance(inSize);
529 queue.postallocate(outSize);
531 // At this point the decompression context can be reused
533 if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
534 queue.chainLength() != uncompressedLength) {
535 throw std::runtime_error("LZ4Frame error: Invalid uncompressedLength");
540 #endif // FOLLY_HAVE_LIBLZ4
542 #if FOLLY_HAVE_LIBSNAPPY
549 * Implementation of snappy::Source that reads from a IOBuf chain.
551 class IOBufSnappySource final : public snappy::Source {
553 explicit IOBufSnappySource(const IOBuf* data);
554 size_t Available() const override;
555 const char* Peek(size_t* len) override;
556 void Skip(size_t n) override;
562 IOBufSnappySource::IOBufSnappySource(const IOBuf* data)
563 : available_(data->computeChainDataLength()),
567 size_t IOBufSnappySource::Available() const {
571 const char* IOBufSnappySource::Peek(size_t* len) {
572 auto sp = StringPiece{cursor_.peekBytes()};
577 void IOBufSnappySource::Skip(size_t n) {
578 CHECK_LE(n, available_);
583 class SnappyCodec final : public Codec {
585 static std::unique_ptr<Codec> create(int level, CodecType type);
586 explicit SnappyCodec(int level, CodecType type);
589 uint64_t doMaxUncompressedLength() const override;
590 std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
591 std::unique_ptr<IOBuf> doUncompress(
593 uint64_t uncompressedLength) override;
596 std::unique_ptr<Codec> SnappyCodec::create(int level, CodecType type) {
597 return make_unique<SnappyCodec>(level, type);
600 SnappyCodec::SnappyCodec(int level, CodecType type) : Codec(type) {
601 DCHECK(type == CodecType::SNAPPY);
603 case COMPRESSION_LEVEL_FASTEST:
604 case COMPRESSION_LEVEL_DEFAULT:
605 case COMPRESSION_LEVEL_BEST:
609 throw std::invalid_argument(to<std::string>(
610 "SnappyCodec: invalid level: ", level));
614 uint64_t SnappyCodec::doMaxUncompressedLength() const {
615 // snappy.h uses uint32_t for lengths, so there's that.
616 return std::numeric_limits<uint32_t>::max();
619 std::unique_ptr<IOBuf> SnappyCodec::doCompress(const IOBuf* data) {
620 IOBufSnappySource source(data);
622 IOBuf::create(snappy::MaxCompressedLength(source.Available()));
624 snappy::UncheckedByteArraySink sink(reinterpret_cast<char*>(
625 out->writableTail()));
627 size_t n = snappy::Compress(&source, &sink);
629 CHECK_LE(n, out->capacity());
634 std::unique_ptr<IOBuf> SnappyCodec::doUncompress(const IOBuf* data,
635 uint64_t uncompressedLength) {
636 uint32_t actualUncompressedLength = 0;
639 IOBufSnappySource source(data);
640 if (!snappy::GetUncompressedLength(&source, &actualUncompressedLength)) {
641 throw std::runtime_error("snappy::GetUncompressedLength failed");
643 if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
644 uncompressedLength != actualUncompressedLength) {
645 throw std::runtime_error("snappy: invalid uncompressed length");
649 auto out = IOBuf::create(actualUncompressedLength);
652 IOBufSnappySource source(data);
653 if (!snappy::RawUncompress(&source,
654 reinterpret_cast<char*>(out->writableTail()))) {
655 throw std::runtime_error("snappy::RawUncompress failed");
659 out->append(actualUncompressedLength);
663 #endif // FOLLY_HAVE_LIBSNAPPY
669 class ZlibCodec final : public Codec {
671 static std::unique_ptr<Codec> create(int level, CodecType type);
672 explicit ZlibCodec(int level, CodecType type);
675 std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
676 std::unique_ptr<IOBuf> doUncompress(
678 uint64_t uncompressedLength) override;
680 std::unique_ptr<IOBuf> addOutputBuffer(z_stream* stream, uint32_t length);
681 bool doInflate(z_stream* stream, IOBuf* head, uint32_t bufferLength);
686 std::unique_ptr<Codec> ZlibCodec::create(int level, CodecType type) {
687 return make_unique<ZlibCodec>(level, type);
690 ZlibCodec::ZlibCodec(int level, CodecType type) : Codec(type) {
691 DCHECK(type == CodecType::ZLIB || type == CodecType::GZIP);
693 case COMPRESSION_LEVEL_FASTEST:
696 case COMPRESSION_LEVEL_DEFAULT:
697 level = Z_DEFAULT_COMPRESSION;
699 case COMPRESSION_LEVEL_BEST:
703 if (level != Z_DEFAULT_COMPRESSION && (level < 0 || level > 9)) {
704 throw std::invalid_argument(to<std::string>(
705 "ZlibCodec: invalid level: ", level));
710 std::unique_ptr<IOBuf> ZlibCodec::addOutputBuffer(z_stream* stream,
712 CHECK_EQ(stream->avail_out, 0);
714 auto buf = IOBuf::create(length);
717 stream->next_out = buf->writableData();
718 stream->avail_out = buf->length();
723 bool ZlibCodec::doInflate(z_stream* stream,
725 uint32_t bufferLength) {
726 if (stream->avail_out == 0) {
727 head->prependChain(addOutputBuffer(stream, bufferLength));
730 int rc = inflate(stream, Z_NO_FLUSH);
741 throw std::runtime_error(to<std::string>(
742 "ZlibCodec: inflate error: ", rc, ": ", stream->msg));
744 CHECK(false) << rc << ": " << stream->msg;
750 std::unique_ptr<IOBuf> ZlibCodec::doCompress(const IOBuf* data) {
752 stream.zalloc = nullptr;
753 stream.zfree = nullptr;
754 stream.opaque = nullptr;
756 // Using deflateInit2() to support gzip. "The windowBits parameter is the
757 // base two logarithm of the maximum window size (...) The default value is
758 // 15 (...) Add 16 to windowBits to write a simple gzip header and trailer
759 // around the compressed data instead of a zlib wrapper. The gzip header
760 // will have no file name, no extra data, no comment, no modification time
761 // (set to zero), no header crc, and the operating system will be set to 255
763 int windowBits = 15 + (type() == CodecType::GZIP ? 16 : 0);
764 // All other parameters (method, memLevel, strategy) get default values from
766 int rc = deflateInit2(&stream,
773 throw std::runtime_error(to<std::string>(
774 "ZlibCodec: deflateInit error: ", rc, ": ", stream.msg));
777 stream.next_in = stream.next_out = nullptr;
778 stream.avail_in = stream.avail_out = 0;
779 stream.total_in = stream.total_out = 0;
781 bool success = false;
784 rc = deflateEnd(&stream);
785 // If we're here because of an exception, it's okay if some data
787 CHECK(rc == Z_OK || (!success && rc == Z_DATA_ERROR))
788 << rc << ": " << stream.msg;
791 uint64_t uncompressedLength = data->computeChainDataLength();
792 uint64_t maxCompressedLength = deflateBound(&stream, uncompressedLength);
794 // Max 64MiB in one go
795 constexpr uint32_t maxSingleStepLength = uint32_t(64) << 20; // 64MiB
796 constexpr uint32_t defaultBufferLength = uint32_t(4) << 20; // 4MiB
798 auto out = addOutputBuffer(
800 (maxCompressedLength <= maxSingleStepLength ?
801 maxCompressedLength :
802 defaultBufferLength));
804 for (auto& range : *data) {
805 uint64_t remaining = range.size();
806 uint64_t written = 0;
808 uint32_t step = (remaining > maxSingleStepLength ?
809 maxSingleStepLength : remaining);
810 stream.next_in = const_cast<uint8_t*>(range.data() + written);
811 stream.avail_in = step;
815 while (stream.avail_in != 0) {
816 if (stream.avail_out == 0) {
817 out->prependChain(addOutputBuffer(&stream, defaultBufferLength));
820 rc = deflate(&stream, Z_NO_FLUSH);
822 CHECK_EQ(rc, Z_OK) << stream.msg;
828 if (stream.avail_out == 0) {
829 out->prependChain(addOutputBuffer(&stream, defaultBufferLength));
832 rc = deflate(&stream, Z_FINISH);
833 } while (rc == Z_OK);
835 CHECK_EQ(rc, Z_STREAM_END) << stream.msg;
837 out->prev()->trimEnd(stream.avail_out);
839 success = true; // we survived
844 std::unique_ptr<IOBuf> ZlibCodec::doUncompress(const IOBuf* data,
845 uint64_t uncompressedLength) {
847 stream.zalloc = nullptr;
848 stream.zfree = nullptr;
849 stream.opaque = nullptr;
851 // "The windowBits parameter is the base two logarithm of the maximum window
852 // size (...) The default value is 15 (...) add 16 to decode only the gzip
853 // format (the zlib format will return a Z_DATA_ERROR)."
854 int windowBits = 15 + (type() == CodecType::GZIP ? 16 : 0);
855 int rc = inflateInit2(&stream, windowBits);
857 throw std::runtime_error(to<std::string>(
858 "ZlibCodec: inflateInit error: ", rc, ": ", stream.msg));
861 stream.next_in = stream.next_out = nullptr;
862 stream.avail_in = stream.avail_out = 0;
863 stream.total_in = stream.total_out = 0;
865 bool success = false;
868 rc = inflateEnd(&stream);
869 // If we're here because of an exception, it's okay if some data
871 CHECK(rc == Z_OK || (!success && rc == Z_DATA_ERROR))
872 << rc << ": " << stream.msg;
875 // Max 64MiB in one go
876 constexpr uint32_t maxSingleStepLength = uint32_t(64) << 20; // 64MiB
877 constexpr uint32_t defaultBufferLength = uint32_t(4) << 20; // 4MiB
879 auto out = addOutputBuffer(
881 ((uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
882 uncompressedLength <= maxSingleStepLength) ?
884 defaultBufferLength));
886 bool streamEnd = false;
887 for (auto& range : *data) {
892 stream.next_in = const_cast<uint8_t*>(range.data());
893 stream.avail_in = range.size();
895 while (stream.avail_in != 0) {
897 throw std::runtime_error(to<std::string>(
898 "ZlibCodec: junk after end of data"));
901 streamEnd = doInflate(&stream, out.get(), defaultBufferLength);
906 streamEnd = doInflate(&stream, out.get(), defaultBufferLength);
909 out->prev()->trimEnd(stream.avail_out);
911 if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
912 uncompressedLength != stream.total_out) {
913 throw std::runtime_error(to<std::string>(
914 "ZlibCodec: invalid uncompressed length"));
917 success = true; // we survived
922 #endif // FOLLY_HAVE_LIBZ
924 #if FOLLY_HAVE_LIBLZMA
929 class LZMA2Codec final : public Codec {
931 static std::unique_ptr<Codec> create(int level, CodecType type);
932 explicit LZMA2Codec(int level, CodecType type);
935 bool doNeedsUncompressedLength() const override;
936 uint64_t doMaxUncompressedLength() const override;
938 bool encodeSize() const { return type() == CodecType::LZMA2_VARINT_SIZE; }
940 std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
941 std::unique_ptr<IOBuf> doUncompress(
943 uint64_t uncompressedLength) override;
945 std::unique_ptr<IOBuf> addOutputBuffer(lzma_stream* stream, size_t length);
946 bool doInflate(lzma_stream* stream, IOBuf* head, size_t bufferLength);
951 std::unique_ptr<Codec> LZMA2Codec::create(int level, CodecType type) {
952 return make_unique<LZMA2Codec>(level, type);
955 LZMA2Codec::LZMA2Codec(int level, CodecType type) : Codec(type) {
956 DCHECK(type == CodecType::LZMA2 || type == CodecType::LZMA2_VARINT_SIZE);
958 case COMPRESSION_LEVEL_FASTEST:
961 case COMPRESSION_LEVEL_DEFAULT:
962 level = LZMA_PRESET_DEFAULT;
964 case COMPRESSION_LEVEL_BEST:
968 if (level < 0 || level > 9) {
969 throw std::invalid_argument(to<std::string>(
970 "LZMA2Codec: invalid level: ", level));
975 bool LZMA2Codec::doNeedsUncompressedLength() const {
979 uint64_t LZMA2Codec::doMaxUncompressedLength() const {
980 // From lzma/base.h: "Stream is roughly 8 EiB (2^63 bytes)"
981 return uint64_t(1) << 63;
984 std::unique_ptr<IOBuf> LZMA2Codec::addOutputBuffer(
988 CHECK_EQ(stream->avail_out, 0);
990 auto buf = IOBuf::create(length);
993 stream->next_out = buf->writableData();
994 stream->avail_out = buf->length();
999 std::unique_ptr<IOBuf> LZMA2Codec::doCompress(const IOBuf* data) {
1001 lzma_stream stream = LZMA_STREAM_INIT;
1003 rc = lzma_easy_encoder(&stream, level_, LZMA_CHECK_NONE);
1004 if (rc != LZMA_OK) {
1005 throw std::runtime_error(folly::to<std::string>(
1006 "LZMA2Codec: lzma_easy_encoder error: ", rc));
1009 SCOPE_EXIT { lzma_end(&stream); };
1011 uint64_t uncompressedLength = data->computeChainDataLength();
1012 uint64_t maxCompressedLength = lzma_stream_buffer_bound(uncompressedLength);
1014 // Max 64MiB in one go
1015 constexpr uint32_t maxSingleStepLength = uint32_t(64) << 20; // 64MiB
1016 constexpr uint32_t defaultBufferLength = uint32_t(4) << 20; // 4MiB
1018 auto out = addOutputBuffer(
1020 (maxCompressedLength <= maxSingleStepLength ?
1021 maxCompressedLength :
1022 defaultBufferLength));
1025 auto size = IOBuf::createCombined(kMaxVarintLength64);
1026 encodeVarintToIOBuf(uncompressedLength, size.get());
1027 size->appendChain(std::move(out));
1028 out = std::move(size);
1031 for (auto& range : *data) {
1032 if (range.empty()) {
1036 stream.next_in = const_cast<uint8_t*>(range.data());
1037 stream.avail_in = range.size();
1039 while (stream.avail_in != 0) {
1040 if (stream.avail_out == 0) {
1041 out->prependChain(addOutputBuffer(&stream, defaultBufferLength));
1044 rc = lzma_code(&stream, LZMA_RUN);
1046 if (rc != LZMA_OK) {
1047 throw std::runtime_error(folly::to<std::string>(
1048 "LZMA2Codec: lzma_code error: ", rc));
1054 if (stream.avail_out == 0) {
1055 out->prependChain(addOutputBuffer(&stream, defaultBufferLength));
1058 rc = lzma_code(&stream, LZMA_FINISH);
1059 } while (rc == LZMA_OK);
1061 if (rc != LZMA_STREAM_END) {
1062 throw std::runtime_error(folly::to<std::string>(
1063 "LZMA2Codec: lzma_code ended with error: ", rc));
1066 out->prev()->trimEnd(stream.avail_out);
1071 bool LZMA2Codec::doInflate(lzma_stream* stream,
1073 size_t bufferLength) {
1074 if (stream->avail_out == 0) {
1075 head->prependChain(addOutputBuffer(stream, bufferLength));
1078 lzma_ret rc = lzma_code(stream, LZMA_RUN);
1083 case LZMA_STREAM_END:
1086 throw std::runtime_error(to<std::string>(
1087 "LZMA2Codec: lzma_code error: ", rc));
1093 std::unique_ptr<IOBuf> LZMA2Codec::doUncompress(const IOBuf* data,
1094 uint64_t uncompressedLength) {
1096 lzma_stream stream = LZMA_STREAM_INIT;
1098 rc = lzma_auto_decoder(&stream, std::numeric_limits<uint64_t>::max(), 0);
1099 if (rc != LZMA_OK) {
1100 throw std::runtime_error(folly::to<std::string>(
1101 "LZMA2Codec: lzma_auto_decoder error: ", rc));
1104 SCOPE_EXIT { lzma_end(&stream); };
1106 // Max 64MiB in one go
1107 constexpr uint32_t maxSingleStepLength = uint32_t(64) << 20; // 64MiB
1108 constexpr uint32_t defaultBufferLength = uint32_t(256) << 10; // 256 KiB
1110 folly::io::Cursor cursor(data);
1112 const uint64_t actualUncompressedLength = decodeVarintFromCursor(cursor);
1113 if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
1114 uncompressedLength != actualUncompressedLength) {
1115 throw std::runtime_error("LZMA2Codec: invalid uncompressed length");
1117 uncompressedLength = actualUncompressedLength;
1120 auto out = addOutputBuffer(
1122 ((uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
1123 uncompressedLength <= maxSingleStepLength)
1124 ? uncompressedLength
1125 : defaultBufferLength));
1127 bool streamEnd = false;
1128 auto buf = cursor.peekBytes();
1129 while (!buf.empty()) {
1130 stream.next_in = const_cast<uint8_t*>(buf.data());
1131 stream.avail_in = buf.size();
1133 while (stream.avail_in != 0) {
1135 throw std::runtime_error(to<std::string>(
1136 "LZMA2Codec: junk after end of data"));
1139 streamEnd = doInflate(&stream, out.get(), defaultBufferLength);
1142 cursor.skip(buf.size());
1143 buf = cursor.peekBytes();
1146 while (!streamEnd) {
1147 streamEnd = doInflate(&stream, out.get(), defaultBufferLength);
1150 out->prev()->trimEnd(stream.avail_out);
1152 if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
1153 uncompressedLength != stream.total_out) {
1154 throw std::runtime_error(
1155 to<std::string>("LZMA2Codec: invalid uncompressed length"));
1161 #endif // FOLLY_HAVE_LIBLZMA
1163 #ifdef FOLLY_HAVE_LIBZSTD
1168 class ZSTDCodec final : public Codec {
1170 static std::unique_ptr<Codec> create(int level, CodecType);
1171 explicit ZSTDCodec(int level, CodecType type);
1174 bool doNeedsUncompressedLength() const override;
1175 std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
1176 std::unique_ptr<IOBuf> doUncompress(
1178 uint64_t uncompressedLength) override;
1183 std::unique_ptr<Codec> ZSTDCodec::create(int level, CodecType type) {
1184 return make_unique<ZSTDCodec>(level, type);
1187 ZSTDCodec::ZSTDCodec(int level, CodecType type) : Codec(type) {
1188 DCHECK(type == CodecType::ZSTD);
1190 case COMPRESSION_LEVEL_FASTEST:
1193 case COMPRESSION_LEVEL_DEFAULT:
1196 case COMPRESSION_LEVEL_BEST:
1200 if (level < 1 || level > ZSTD_maxCLevel()) {
1201 throw std::invalid_argument(
1202 to<std::string>("ZSTD: invalid level: ", level));
1207 bool ZSTDCodec::doNeedsUncompressedLength() const {
1211 void zstdThrowIfError(size_t rc) {
1212 if (!ZSTD_isError(rc)) {
1215 throw std::runtime_error(
1216 to<std::string>("ZSTD returned an error: ", ZSTD_getErrorName(rc)));
1219 std::unique_ptr<IOBuf> ZSTDCodec::doCompress(const IOBuf* data) {
1220 // Support earlier versions of the codec (working with a single IOBuf,
1221 // and using ZSTD_decompress which requires ZSTD frame to contain size,
1222 // which isn't populated by streaming API).
1223 if (!data->isChained()) {
1224 auto out = IOBuf::createCombined(ZSTD_compressBound(data->length()));
1225 const auto rc = ZSTD_compress(
1226 out->writableData(),
1231 zstdThrowIfError(rc);
1236 auto zcs = ZSTD_createCStream();
1238 ZSTD_freeCStream(zcs);
1241 auto rc = ZSTD_initCStream(zcs, level_);
1242 zstdThrowIfError(rc);
1244 Cursor cursor(data);
1245 auto result = IOBuf::createCombined(ZSTD_compressBound(cursor.totalLength()));
1248 out.dst = result->writableTail();
1249 out.size = result->capacity();
1252 for (auto buffer = cursor.peekBytes(); !buffer.empty();) {
1254 in.src = buffer.data();
1255 in.size = buffer.size();
1256 for (in.pos = 0; in.pos != in.size;) {
1257 rc = ZSTD_compressStream(zcs, &out, &in);
1258 zstdThrowIfError(rc);
1260 cursor.skip(in.size);
1261 buffer = cursor.peekBytes();
1264 rc = ZSTD_endStream(zcs, &out);
1265 zstdThrowIfError(rc);
1268 result->append(out.pos);
1272 static std::unique_ptr<IOBuf> zstdUncompressBuffer(
1274 uint64_t uncompressedLength) {
1275 // Check preconditions
1276 DCHECK(!data->isChained());
1277 DCHECK(uncompressedLength != Codec::UNKNOWN_UNCOMPRESSED_LENGTH);
1279 auto uncompressed = IOBuf::create(uncompressedLength);
1280 const auto decompressedSize = ZSTD_decompress(
1281 uncompressed->writableTail(),
1282 uncompressed->tailroom(),
1285 zstdThrowIfError(decompressedSize);
1286 if (decompressedSize != uncompressedLength) {
1287 throw std::runtime_error("ZSTD: invalid uncompressed length");
1289 uncompressed->append(decompressedSize);
1290 return uncompressed;
1293 static std::unique_ptr<IOBuf> zstdUncompressStream(
1295 uint64_t uncompressedLength) {
1296 auto zds = ZSTD_createDStream();
1298 ZSTD_freeDStream(zds);
1301 auto rc = ZSTD_initDStream(zds);
1302 zstdThrowIfError(rc);
1304 ZSTD_outBuffer out{};
1307 auto outputSize = ZSTD_DStreamOutSize();
1308 if (uncompressedLength != Codec::UNKNOWN_UNCOMPRESSED_LENGTH) {
1309 outputSize = uncompressedLength;
1312 IOBufQueue queue(IOBufQueue::cacheChainLength());
1314 Cursor cursor(data);
1316 if (in.pos == in.size) {
1317 auto buffer = cursor.peekBytes();
1318 in.src = buffer.data();
1319 in.size = buffer.size();
1321 cursor.skip(in.size);
1322 if (rc > 1 && in.size == 0) {
1323 throw std::runtime_error(to<std::string>("ZSTD: incomplete input"));
1326 if (out.pos == out.size) {
1328 queue.postallocate(out.pos);
1330 auto buffer = queue.preallocate(outputSize, outputSize);
1331 out.dst = buffer.first;
1332 out.size = buffer.second;
1334 outputSize = ZSTD_DStreamOutSize();
1336 rc = ZSTD_decompressStream(zds, &out, &in);
1337 zstdThrowIfError(rc);
1343 queue.postallocate(out.pos);
1345 if (in.pos != in.size || !cursor.isAtEnd()) {
1346 throw std::runtime_error("ZSTD: junk after end of data");
1348 if (uncompressedLength != Codec::UNKNOWN_UNCOMPRESSED_LENGTH &&
1349 queue.chainLength() != uncompressedLength) {
1350 throw std::runtime_error("ZSTD: invalid uncompressed length");
1353 return queue.move();
1356 std::unique_ptr<IOBuf> ZSTDCodec::doUncompress(
1358 uint64_t uncompressedLength) {
1360 // Read decompressed size from frame if available in first IOBuf.
1361 const auto decompressedSize =
1362 ZSTD_getDecompressedSize(data->data(), data->length());
1363 if (decompressedSize != 0) {
1364 if (uncompressedLength != Codec::UNKNOWN_UNCOMPRESSED_LENGTH &&
1365 uncompressedLength != decompressedSize) {
1366 throw std::runtime_error("ZSTD: invalid uncompressed length");
1368 uncompressedLength = decompressedSize;
1371 // Faster to decompress using ZSTD_decompress() if we can.
1372 if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH && !data->isChained()) {
1373 return zstdUncompressBuffer(data, uncompressedLength);
1375 // Fall back to slower streaming decompression.
1376 return zstdUncompressStream(data, uncompressedLength);
1379 #endif // FOLLY_HAVE_LIBZSTD
1383 typedef std::unique_ptr<Codec> (*CodecFactory)(int, CodecType);
1384 static constexpr CodecFactory
1385 codecFactories[static_cast<size_t>(CodecType::NUM_CODEC_TYPES)] = {
1386 nullptr, // USER_DEFINED
1387 NoCompressionCodec::create,
1389 #if FOLLY_HAVE_LIBLZ4
1395 #if FOLLY_HAVE_LIBSNAPPY
1396 SnappyCodec::create,
1407 #if FOLLY_HAVE_LIBLZ4
1413 #if FOLLY_HAVE_LIBLZMA
1421 #if FOLLY_HAVE_LIBZSTD
1433 #if FOLLY_HAVE_LIBLZ4
1434 LZ4FrameCodec::create,
1440 bool hasCodec(CodecType type) {
1441 size_t idx = static_cast<size_t>(type);
1442 if (idx >= static_cast<size_t>(CodecType::NUM_CODEC_TYPES)) {
1443 throw std::invalid_argument(
1444 to<std::string>("Compression type ", idx, " invalid"));
1446 return codecFactories[idx] != nullptr;
1449 std::unique_ptr<Codec> getCodec(CodecType type, int level) {
1450 size_t idx = static_cast<size_t>(type);
1451 if (idx >= static_cast<size_t>(CodecType::NUM_CODEC_TYPES)) {
1452 throw std::invalid_argument(
1453 to<std::string>("Compression type ", idx, " invalid"));
1455 auto factory = codecFactories[idx];
1457 throw std::invalid_argument(to<std::string>(
1458 "Compression type ", idx, " not supported"));
1460 auto codec = (*factory)(level, type);
1461 DCHECK_EQ(static_cast<size_t>(codec->type()), idx);