2 * Copyright 2016 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #include <folly/io/Compression.h>
24 #include <glog/logging.h>
26 #if FOLLY_HAVE_LIBSNAPPY
28 #include <snappy-sinksource.h>
35 #if FOLLY_HAVE_LIBLZMA
39 #if FOLLY_HAVE_LIBZSTD
43 #include <folly/Conv.h>
44 #include <folly/Memory.h>
45 #include <folly/Portability.h>
46 #include <folly/ScopeGuard.h>
47 #include <folly/Varint.h>
48 #include <folly/io/Cursor.h>
50 namespace folly { namespace io {
52 Codec::Codec(CodecType type) : type_(type) { }
54 // Ensure consistent behavior in the nullptr case
55 std::unique_ptr<IOBuf> Codec::compress(const IOBuf* data) {
56 uint64_t len = data->computeChainDataLength();
58 return IOBuf::create(0);
59 } else if (len > maxUncompressedLength()) {
60 throw std::runtime_error("Codec: uncompressed length too large");
63 return doCompress(data);
66 std::unique_ptr<IOBuf> Codec::uncompress(const IOBuf* data,
67 uint64_t uncompressedLength) {
68 if (uncompressedLength == UNKNOWN_UNCOMPRESSED_LENGTH) {
69 if (needsUncompressedLength()) {
70 throw std::invalid_argument("Codec: uncompressed length required");
72 } else if (uncompressedLength > maxUncompressedLength()) {
73 throw std::runtime_error("Codec: uncompressed length too large");
77 if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
78 uncompressedLength != 0) {
79 throw std::runtime_error("Codec: invalid uncompressed length");
81 return IOBuf::create(0);
84 return doUncompress(data, uncompressedLength);
87 bool Codec::needsUncompressedLength() const {
88 return doNeedsUncompressedLength();
91 uint64_t Codec::maxUncompressedLength() const {
92 return doMaxUncompressedLength();
95 bool Codec::doNeedsUncompressedLength() const {
99 uint64_t Codec::doMaxUncompressedLength() const {
100 return UNLIMITED_UNCOMPRESSED_LENGTH;
108 class NoCompressionCodec final : public Codec {
110 static std::unique_ptr<Codec> create(int level, CodecType type);
111 explicit NoCompressionCodec(int level, CodecType type);
114 std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
115 std::unique_ptr<IOBuf> doUncompress(
117 uint64_t uncompressedLength) override;
120 std::unique_ptr<Codec> NoCompressionCodec::create(int level, CodecType type) {
121 return make_unique<NoCompressionCodec>(level, type);
124 NoCompressionCodec::NoCompressionCodec(int level, CodecType type)
126 DCHECK(type == CodecType::NO_COMPRESSION);
128 case COMPRESSION_LEVEL_DEFAULT:
129 case COMPRESSION_LEVEL_FASTEST:
130 case COMPRESSION_LEVEL_BEST:
134 throw std::invalid_argument(to<std::string>(
135 "NoCompressionCodec: invalid level ", level));
139 std::unique_ptr<IOBuf> NoCompressionCodec::doCompress(
141 return data->clone();
144 std::unique_ptr<IOBuf> NoCompressionCodec::doUncompress(
146 uint64_t uncompressedLength) {
147 if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
148 data->computeChainDataLength() != uncompressedLength) {
149 throw std::runtime_error(to<std::string>(
150 "NoCompressionCodec: invalid uncompressed length"));
152 return data->clone();
155 #if (FOLLY_HAVE_LIBLZ4 || FOLLY_HAVE_LIBLZMA)
159 void encodeVarintToIOBuf(uint64_t val, folly::IOBuf* out) {
160 DCHECK_GE(out->tailroom(), kMaxVarintLength64);
161 out->append(encodeVarint(val, out->writableTail()));
164 inline uint64_t decodeVarintFromCursor(folly::io::Cursor& cursor) {
167 for (int shift = 0; shift <= 63; shift += 7) {
168 b = cursor.read<int8_t>();
169 val |= static_cast<uint64_t>(b & 0x7f) << shift;
175 throw std::invalid_argument("Invalid varint value. Too big.");
182 #endif // FOLLY_HAVE_LIBLZ4 || FOLLY_HAVE_LIBLZMA
184 #if FOLLY_HAVE_LIBLZ4
189 class LZ4Codec final : public Codec {
191 static std::unique_ptr<Codec> create(int level, CodecType type);
192 explicit LZ4Codec(int level, CodecType type);
195 bool doNeedsUncompressedLength() const override;
196 uint64_t doMaxUncompressedLength() const override;
198 bool encodeSize() const { return type() == CodecType::LZ4_VARINT_SIZE; }
200 std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
201 std::unique_ptr<IOBuf> doUncompress(
203 uint64_t uncompressedLength) override;
205 bool highCompression_;
208 std::unique_ptr<Codec> LZ4Codec::create(int level, CodecType type) {
209 return make_unique<LZ4Codec>(level, type);
212 LZ4Codec::LZ4Codec(int level, CodecType type) : Codec(type) {
213 DCHECK(type == CodecType::LZ4 || type == CodecType::LZ4_VARINT_SIZE);
216 case COMPRESSION_LEVEL_FASTEST:
217 case COMPRESSION_LEVEL_DEFAULT:
220 case COMPRESSION_LEVEL_BEST:
224 if (level < 1 || level > 2) {
225 throw std::invalid_argument(to<std::string>(
226 "LZ4Codec: invalid level: ", level));
228 highCompression_ = (level > 1);
231 bool LZ4Codec::doNeedsUncompressedLength() const {
232 return !encodeSize();
235 // The value comes from lz4.h in lz4-r117, but older versions of lz4 don't
236 // define LZ4_MAX_INPUT_SIZE (even though the max size is the same), so do it
238 #ifndef LZ4_MAX_INPUT_SIZE
239 # define LZ4_MAX_INPUT_SIZE 0x7E000000
242 uint64_t LZ4Codec::doMaxUncompressedLength() const {
243 return LZ4_MAX_INPUT_SIZE;
246 std::unique_ptr<IOBuf> LZ4Codec::doCompress(const IOBuf* data) {
247 std::unique_ptr<IOBuf> clone;
248 if (data->isChained()) {
249 // LZ4 doesn't support streaming, so we have to coalesce
250 clone = data->clone();
255 uint32_t extraSize = encodeSize() ? kMaxVarintLength64 : 0;
256 auto out = IOBuf::create(extraSize + LZ4_compressBound(data->length()));
258 encodeVarintToIOBuf(data->length(), out.get());
262 auto input = reinterpret_cast<const char*>(data->data());
263 auto output = reinterpret_cast<char*>(out->writableTail());
264 const auto inputLength = data->length();
265 #if LZ4_VERSION_NUMBER >= 10700
266 if (highCompression_) {
267 n = LZ4_compress_HC(input, output, inputLength, out->tailroom(), 0);
269 n = LZ4_compress_default(input, output, inputLength, out->tailroom());
272 if (highCompression_) {
273 n = LZ4_compressHC(input, output, inputLength);
275 n = LZ4_compress(input, output, inputLength);
280 CHECK_LE(n, out->capacity());
286 std::unique_ptr<IOBuf> LZ4Codec::doUncompress(
288 uint64_t uncompressedLength) {
289 std::unique_ptr<IOBuf> clone;
290 if (data->isChained()) {
291 // LZ4 doesn't support streaming, so we have to coalesce
292 clone = data->clone();
297 folly::io::Cursor cursor(data);
298 uint64_t actualUncompressedLength;
300 actualUncompressedLength = decodeVarintFromCursor(cursor);
301 if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
302 uncompressedLength != actualUncompressedLength) {
303 throw std::runtime_error("LZ4Codec: invalid uncompressed length");
306 actualUncompressedLength = uncompressedLength;
307 if (actualUncompressedLength == UNKNOWN_UNCOMPRESSED_LENGTH ||
308 actualUncompressedLength > maxUncompressedLength()) {
309 throw std::runtime_error("LZ4Codec: invalid uncompressed length");
313 auto sp = StringPiece{cursor.peekBytes()};
314 auto out = IOBuf::create(actualUncompressedLength);
315 int n = LZ4_decompress_safe(
317 reinterpret_cast<char*>(out->writableTail()),
319 actualUncompressedLength);
321 if (n < 0 || uint64_t(n) != actualUncompressedLength) {
322 throw std::runtime_error(to<std::string>(
323 "LZ4 decompression returned invalid value ", n));
325 out->append(actualUncompressedLength);
329 #endif // FOLLY_HAVE_LIBLZ4
331 #if FOLLY_HAVE_LIBSNAPPY
338 * Implementation of snappy::Source that reads from a IOBuf chain.
340 class IOBufSnappySource final : public snappy::Source {
342 explicit IOBufSnappySource(const IOBuf* data);
343 size_t Available() const override;
344 const char* Peek(size_t* len) override;
345 void Skip(size_t n) override;
351 IOBufSnappySource::IOBufSnappySource(const IOBuf* data)
352 : available_(data->computeChainDataLength()),
356 size_t IOBufSnappySource::Available() const {
360 const char* IOBufSnappySource::Peek(size_t* len) {
361 auto sp = StringPiece{cursor_.peekBytes()};
366 void IOBufSnappySource::Skip(size_t n) {
367 CHECK_LE(n, available_);
372 class SnappyCodec final : public Codec {
374 static std::unique_ptr<Codec> create(int level, CodecType type);
375 explicit SnappyCodec(int level, CodecType type);
378 uint64_t doMaxUncompressedLength() const override;
379 std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
380 std::unique_ptr<IOBuf> doUncompress(
382 uint64_t uncompressedLength) override;
385 std::unique_ptr<Codec> SnappyCodec::create(int level, CodecType type) {
386 return make_unique<SnappyCodec>(level, type);
389 SnappyCodec::SnappyCodec(int level, CodecType type) : Codec(type) {
390 DCHECK(type == CodecType::SNAPPY);
392 case COMPRESSION_LEVEL_FASTEST:
393 case COMPRESSION_LEVEL_DEFAULT:
394 case COMPRESSION_LEVEL_BEST:
398 throw std::invalid_argument(to<std::string>(
399 "SnappyCodec: invalid level: ", level));
403 uint64_t SnappyCodec::doMaxUncompressedLength() const {
404 // snappy.h uses uint32_t for lengths, so there's that.
405 return std::numeric_limits<uint32_t>::max();
408 std::unique_ptr<IOBuf> SnappyCodec::doCompress(const IOBuf* data) {
409 IOBufSnappySource source(data);
411 IOBuf::create(snappy::MaxCompressedLength(source.Available()));
413 snappy::UncheckedByteArraySink sink(reinterpret_cast<char*>(
414 out->writableTail()));
416 size_t n = snappy::Compress(&source, &sink);
418 CHECK_LE(n, out->capacity());
423 std::unique_ptr<IOBuf> SnappyCodec::doUncompress(const IOBuf* data,
424 uint64_t uncompressedLength) {
425 uint32_t actualUncompressedLength = 0;
428 IOBufSnappySource source(data);
429 if (!snappy::GetUncompressedLength(&source, &actualUncompressedLength)) {
430 throw std::runtime_error("snappy::GetUncompressedLength failed");
432 if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
433 uncompressedLength != actualUncompressedLength) {
434 throw std::runtime_error("snappy: invalid uncompressed length");
438 auto out = IOBuf::create(actualUncompressedLength);
441 IOBufSnappySource source(data);
442 if (!snappy::RawUncompress(&source,
443 reinterpret_cast<char*>(out->writableTail()))) {
444 throw std::runtime_error("snappy::RawUncompress failed");
448 out->append(actualUncompressedLength);
452 #endif // FOLLY_HAVE_LIBSNAPPY
458 class ZlibCodec final : public Codec {
460 static std::unique_ptr<Codec> create(int level, CodecType type);
461 explicit ZlibCodec(int level, CodecType type);
464 std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
465 std::unique_ptr<IOBuf> doUncompress(
467 uint64_t uncompressedLength) override;
469 std::unique_ptr<IOBuf> addOutputBuffer(z_stream* stream, uint32_t length);
470 bool doInflate(z_stream* stream, IOBuf* head, uint32_t bufferLength);
475 std::unique_ptr<Codec> ZlibCodec::create(int level, CodecType type) {
476 return make_unique<ZlibCodec>(level, type);
479 ZlibCodec::ZlibCodec(int level, CodecType type) : Codec(type) {
480 DCHECK(type == CodecType::ZLIB || type == CodecType::GZIP);
482 case COMPRESSION_LEVEL_FASTEST:
485 case COMPRESSION_LEVEL_DEFAULT:
486 level = Z_DEFAULT_COMPRESSION;
488 case COMPRESSION_LEVEL_BEST:
492 if (level != Z_DEFAULT_COMPRESSION && (level < 0 || level > 9)) {
493 throw std::invalid_argument(to<std::string>(
494 "ZlibCodec: invalid level: ", level));
499 std::unique_ptr<IOBuf> ZlibCodec::addOutputBuffer(z_stream* stream,
501 CHECK_EQ(stream->avail_out, 0);
503 auto buf = IOBuf::create(length);
506 stream->next_out = buf->writableData();
507 stream->avail_out = buf->length();
512 bool ZlibCodec::doInflate(z_stream* stream,
514 uint32_t bufferLength) {
515 if (stream->avail_out == 0) {
516 head->prependChain(addOutputBuffer(stream, bufferLength));
519 int rc = inflate(stream, Z_NO_FLUSH);
530 throw std::runtime_error(to<std::string>(
531 "ZlibCodec: inflate error: ", rc, ": ", stream->msg));
533 CHECK(false) << rc << ": " << stream->msg;
539 std::unique_ptr<IOBuf> ZlibCodec::doCompress(const IOBuf* data) {
541 stream.zalloc = nullptr;
542 stream.zfree = nullptr;
543 stream.opaque = nullptr;
545 // Using deflateInit2() to support gzip. "The windowBits parameter is the
546 // base two logarithm of the maximum window size (...) The default value is
547 // 15 (...) Add 16 to windowBits to write a simple gzip header and trailer
548 // around the compressed data instead of a zlib wrapper. The gzip header
549 // will have no file name, no extra data, no comment, no modification time
550 // (set to zero), no header crc, and the operating system will be set to 255
552 int windowBits = 15 + (type() == CodecType::GZIP ? 16 : 0);
553 // All other parameters (method, memLevel, strategy) get default values from
555 int rc = deflateInit2(&stream,
562 throw std::runtime_error(to<std::string>(
563 "ZlibCodec: deflateInit error: ", rc, ": ", stream.msg));
566 stream.next_in = stream.next_out = nullptr;
567 stream.avail_in = stream.avail_out = 0;
568 stream.total_in = stream.total_out = 0;
570 bool success = false;
573 rc = deflateEnd(&stream);
574 // If we're here because of an exception, it's okay if some data
576 CHECK(rc == Z_OK || (!success && rc == Z_DATA_ERROR))
577 << rc << ": " << stream.msg;
580 uint64_t uncompressedLength = data->computeChainDataLength();
581 uint64_t maxCompressedLength = deflateBound(&stream, uncompressedLength);
583 // Max 64MiB in one go
584 constexpr uint32_t maxSingleStepLength = uint32_t(64) << 20; // 64MiB
585 constexpr uint32_t defaultBufferLength = uint32_t(4) << 20; // 4MiB
587 auto out = addOutputBuffer(
589 (maxCompressedLength <= maxSingleStepLength ?
590 maxCompressedLength :
591 defaultBufferLength));
593 for (auto& range : *data) {
594 uint64_t remaining = range.size();
595 uint64_t written = 0;
597 uint32_t step = (remaining > maxSingleStepLength ?
598 maxSingleStepLength : remaining);
599 stream.next_in = const_cast<uint8_t*>(range.data() + written);
600 stream.avail_in = step;
604 while (stream.avail_in != 0) {
605 if (stream.avail_out == 0) {
606 out->prependChain(addOutputBuffer(&stream, defaultBufferLength));
609 rc = deflate(&stream, Z_NO_FLUSH);
611 CHECK_EQ(rc, Z_OK) << stream.msg;
617 if (stream.avail_out == 0) {
618 out->prependChain(addOutputBuffer(&stream, defaultBufferLength));
621 rc = deflate(&stream, Z_FINISH);
622 } while (rc == Z_OK);
624 CHECK_EQ(rc, Z_STREAM_END) << stream.msg;
626 out->prev()->trimEnd(stream.avail_out);
628 success = true; // we survived
633 std::unique_ptr<IOBuf> ZlibCodec::doUncompress(const IOBuf* data,
634 uint64_t uncompressedLength) {
636 stream.zalloc = nullptr;
637 stream.zfree = nullptr;
638 stream.opaque = nullptr;
640 // "The windowBits parameter is the base two logarithm of the maximum window
641 // size (...) The default value is 15 (...) add 16 to decode only the gzip
642 // format (the zlib format will return a Z_DATA_ERROR)."
643 int windowBits = 15 + (type() == CodecType::GZIP ? 16 : 0);
644 int rc = inflateInit2(&stream, windowBits);
646 throw std::runtime_error(to<std::string>(
647 "ZlibCodec: inflateInit error: ", rc, ": ", stream.msg));
650 stream.next_in = stream.next_out = nullptr;
651 stream.avail_in = stream.avail_out = 0;
652 stream.total_in = stream.total_out = 0;
654 bool success = false;
657 rc = inflateEnd(&stream);
658 // If we're here because of an exception, it's okay if some data
660 CHECK(rc == Z_OK || (!success && rc == Z_DATA_ERROR))
661 << rc << ": " << stream.msg;
664 // Max 64MiB in one go
665 constexpr uint32_t maxSingleStepLength = uint32_t(64) << 20; // 64MiB
666 constexpr uint32_t defaultBufferLength = uint32_t(4) << 20; // 4MiB
668 auto out = addOutputBuffer(
670 ((uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
671 uncompressedLength <= maxSingleStepLength) ?
673 defaultBufferLength));
675 bool streamEnd = false;
676 for (auto& range : *data) {
681 stream.next_in = const_cast<uint8_t*>(range.data());
682 stream.avail_in = range.size();
684 while (stream.avail_in != 0) {
686 throw std::runtime_error(to<std::string>(
687 "ZlibCodec: junk after end of data"));
690 streamEnd = doInflate(&stream, out.get(), defaultBufferLength);
695 streamEnd = doInflate(&stream, out.get(), defaultBufferLength);
698 out->prev()->trimEnd(stream.avail_out);
700 if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
701 uncompressedLength != stream.total_out) {
702 throw std::runtime_error(to<std::string>(
703 "ZlibCodec: invalid uncompressed length"));
706 success = true; // we survived
711 #endif // FOLLY_HAVE_LIBZ
713 #if FOLLY_HAVE_LIBLZMA
718 class LZMA2Codec final : public Codec {
720 static std::unique_ptr<Codec> create(int level, CodecType type);
721 explicit LZMA2Codec(int level, CodecType type);
724 bool doNeedsUncompressedLength() const override;
725 uint64_t doMaxUncompressedLength() const override;
727 bool encodeSize() const { return type() == CodecType::LZMA2_VARINT_SIZE; }
729 std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
730 std::unique_ptr<IOBuf> doUncompress(
732 uint64_t uncompressedLength) override;
734 std::unique_ptr<IOBuf> addOutputBuffer(lzma_stream* stream, size_t length);
735 bool doInflate(lzma_stream* stream, IOBuf* head, size_t bufferLength);
740 std::unique_ptr<Codec> LZMA2Codec::create(int level, CodecType type) {
741 return make_unique<LZMA2Codec>(level, type);
744 LZMA2Codec::LZMA2Codec(int level, CodecType type) : Codec(type) {
745 DCHECK(type == CodecType::LZMA2 || type == CodecType::LZMA2_VARINT_SIZE);
747 case COMPRESSION_LEVEL_FASTEST:
750 case COMPRESSION_LEVEL_DEFAULT:
751 level = LZMA_PRESET_DEFAULT;
753 case COMPRESSION_LEVEL_BEST:
757 if (level < 0 || level > 9) {
758 throw std::invalid_argument(to<std::string>(
759 "LZMA2Codec: invalid level: ", level));
764 bool LZMA2Codec::doNeedsUncompressedLength() const {
765 return !encodeSize();
768 uint64_t LZMA2Codec::doMaxUncompressedLength() const {
769 // From lzma/base.h: "Stream is roughly 8 EiB (2^63 bytes)"
770 return uint64_t(1) << 63;
773 std::unique_ptr<IOBuf> LZMA2Codec::addOutputBuffer(
777 CHECK_EQ(stream->avail_out, 0);
779 auto buf = IOBuf::create(length);
782 stream->next_out = buf->writableData();
783 stream->avail_out = buf->length();
788 std::unique_ptr<IOBuf> LZMA2Codec::doCompress(const IOBuf* data) {
790 lzma_stream stream = LZMA_STREAM_INIT;
792 rc = lzma_easy_encoder(&stream, level_, LZMA_CHECK_NONE);
794 throw std::runtime_error(folly::to<std::string>(
795 "LZMA2Codec: lzma_easy_encoder error: ", rc));
798 SCOPE_EXIT { lzma_end(&stream); };
800 uint64_t uncompressedLength = data->computeChainDataLength();
801 uint64_t maxCompressedLength = lzma_stream_buffer_bound(uncompressedLength);
803 // Max 64MiB in one go
804 constexpr uint32_t maxSingleStepLength = uint32_t(64) << 20; // 64MiB
805 constexpr uint32_t defaultBufferLength = uint32_t(4) << 20; // 4MiB
807 auto out = addOutputBuffer(
809 (maxCompressedLength <= maxSingleStepLength ?
810 maxCompressedLength :
811 defaultBufferLength));
814 auto size = IOBuf::createCombined(kMaxVarintLength64);
815 encodeVarintToIOBuf(uncompressedLength, size.get());
816 size->appendChain(std::move(out));
817 out = std::move(size);
820 for (auto& range : *data) {
825 stream.next_in = const_cast<uint8_t*>(range.data());
826 stream.avail_in = range.size();
828 while (stream.avail_in != 0) {
829 if (stream.avail_out == 0) {
830 out->prependChain(addOutputBuffer(&stream, defaultBufferLength));
833 rc = lzma_code(&stream, LZMA_RUN);
836 throw std::runtime_error(folly::to<std::string>(
837 "LZMA2Codec: lzma_code error: ", rc));
843 if (stream.avail_out == 0) {
844 out->prependChain(addOutputBuffer(&stream, defaultBufferLength));
847 rc = lzma_code(&stream, LZMA_FINISH);
848 } while (rc == LZMA_OK);
850 if (rc != LZMA_STREAM_END) {
851 throw std::runtime_error(folly::to<std::string>(
852 "LZMA2Codec: lzma_code ended with error: ", rc));
855 out->prev()->trimEnd(stream.avail_out);
860 bool LZMA2Codec::doInflate(lzma_stream* stream,
862 size_t bufferLength) {
863 if (stream->avail_out == 0) {
864 head->prependChain(addOutputBuffer(stream, bufferLength));
867 lzma_ret rc = lzma_code(stream, LZMA_RUN);
872 case LZMA_STREAM_END:
875 throw std::runtime_error(to<std::string>(
876 "LZMA2Codec: lzma_code error: ", rc));
882 std::unique_ptr<IOBuf> LZMA2Codec::doUncompress(const IOBuf* data,
883 uint64_t uncompressedLength) {
885 lzma_stream stream = LZMA_STREAM_INIT;
887 rc = lzma_auto_decoder(&stream, std::numeric_limits<uint64_t>::max(), 0);
889 throw std::runtime_error(folly::to<std::string>(
890 "LZMA2Codec: lzma_auto_decoder error: ", rc));
893 SCOPE_EXIT { lzma_end(&stream); };
895 // Max 64MiB in one go
896 constexpr uint32_t maxSingleStepLength = uint32_t(64) << 20; // 64MiB
897 constexpr uint32_t defaultBufferLength = uint32_t(4) << 20; // 4MiB
899 folly::io::Cursor cursor(data);
900 uint64_t actualUncompressedLength;
902 actualUncompressedLength = decodeVarintFromCursor(cursor);
903 if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
904 uncompressedLength != actualUncompressedLength) {
905 throw std::runtime_error("LZMA2Codec: invalid uncompressed length");
908 actualUncompressedLength = uncompressedLength;
909 DCHECK_NE(actualUncompressedLength, UNKNOWN_UNCOMPRESSED_LENGTH);
912 auto out = addOutputBuffer(
914 (actualUncompressedLength <= maxSingleStepLength ?
915 actualUncompressedLength :
916 defaultBufferLength));
918 bool streamEnd = false;
919 auto buf = cursor.peekBytes();
920 while (!buf.empty()) {
921 stream.next_in = const_cast<uint8_t*>(buf.data());
922 stream.avail_in = buf.size();
924 while (stream.avail_in != 0) {
926 throw std::runtime_error(to<std::string>(
927 "LZMA2Codec: junk after end of data"));
930 streamEnd = doInflate(&stream, out.get(), defaultBufferLength);
933 cursor.skip(buf.size());
934 buf = cursor.peekBytes();
938 streamEnd = doInflate(&stream, out.get(), defaultBufferLength);
941 out->prev()->trimEnd(stream.avail_out);
943 if (actualUncompressedLength != stream.total_out) {
944 throw std::runtime_error(to<std::string>(
945 "LZMA2Codec: invalid uncompressed length"));
951 #endif // FOLLY_HAVE_LIBLZMA
953 #ifdef FOLLY_HAVE_LIBZSTD
958 class ZSTDCodec final : public Codec {
960 static std::unique_ptr<Codec> create(int level, CodecType);
961 explicit ZSTDCodec(int level, CodecType type);
964 bool doNeedsUncompressedLength() const override;
965 std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
966 std::unique_ptr<IOBuf> doUncompress(
968 uint64_t uncompressedLength) override;
973 std::unique_ptr<Codec> ZSTDCodec::create(int level, CodecType type) {
974 return make_unique<ZSTDCodec>(level, type);
977 ZSTDCodec::ZSTDCodec(int level, CodecType type) : Codec(type) {
978 DCHECK(type == CodecType::ZSTD);
980 case COMPRESSION_LEVEL_FASTEST:
983 case COMPRESSION_LEVEL_DEFAULT:
986 case COMPRESSION_LEVEL_BEST:
990 if (level < 1 || level > ZSTD_maxCLevel()) {
991 throw std::invalid_argument(
992 to<std::string>("ZSTD: invalid level: ", level));
997 bool ZSTDCodec::doNeedsUncompressedLength() const {
1001 void zstdThrowIfError(size_t rc) {
1002 if (!ZSTD_isError(rc)) {
1005 throw std::runtime_error(
1006 to<std::string>("ZSTD returned an error: ", ZSTD_getErrorName(rc)));
1009 std::unique_ptr<IOBuf> ZSTDCodec::doCompress(const IOBuf* data) {
1010 // Support earlier versions of the codec (working with a single IOBuf,
1011 // and using ZSTD_decompress which requires ZSTD frame to contain size,
1012 // which isn't populated by streaming API).
1013 if (!data->isChained()) {
1014 auto out = IOBuf::createCombined(ZSTD_compressBound(data->length()));
1015 const auto rc = ZSTD_compress(
1016 out->writableData(),
1021 zstdThrowIfError(rc);
1026 auto zcs = ZSTD_createCStream();
1028 ZSTD_freeCStream(zcs);
1031 auto rc = ZSTD_initCStream(zcs, level_);
1032 zstdThrowIfError(rc);
1034 Cursor cursor(data);
1035 auto result = IOBuf::createCombined(ZSTD_compressBound(cursor.totalLength()));
1038 out.dst = result->writableTail();
1039 out.size = result->capacity();
1042 for (auto buffer = cursor.peekBytes(); !buffer.empty();) {
1044 in.src = buffer.data();
1045 in.size = buffer.size();
1046 for (in.pos = 0; in.pos != in.size;) {
1047 rc = ZSTD_compressStream(zcs, &out, &in);
1048 zstdThrowIfError(rc);
1050 cursor.skip(in.size);
1051 buffer = cursor.peekBytes();
1054 rc = ZSTD_endStream(zcs, &out);
1055 zstdThrowIfError(rc);
1058 result->append(out.pos);
1062 std::unique_ptr<IOBuf> ZSTDCodec::doUncompress(
1064 uint64_t uncompressedLength) {
1065 auto zds = ZSTD_createDStream();
1067 ZSTD_freeDStream(zds);
1070 auto rc = ZSTD_initDStream(zds);
1071 zstdThrowIfError(rc);
1073 ZSTD_outBuffer out{};
1076 auto outputSize = ZSTD_DStreamOutSize();
1077 if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH) {
1078 outputSize = uncompressedLength;
1080 auto decompressedSize =
1081 ZSTD_getDecompressedSize(data->data(), data->length());
1082 if (decompressedSize != 0 && decompressedSize < outputSize) {
1083 outputSize = decompressedSize;
1087 IOBufQueue queue(IOBufQueue::cacheChainLength());
1089 Cursor cursor(data);
1091 if (in.pos == in.size) {
1092 auto buffer = cursor.peekBytes();
1093 in.src = buffer.data();
1094 in.size = buffer.size();
1096 cursor.skip(in.size);
1097 if (rc > 1 && in.size == 0) {
1098 throw std::runtime_error(to<std::string>("ZSTD: incomplete input"));
1101 if (out.pos == out.size) {
1103 queue.postallocate(out.pos);
1105 auto buffer = queue.preallocate(outputSize, outputSize);
1106 out.dst = buffer.first;
1107 out.size = buffer.second;
1109 outputSize = ZSTD_DStreamOutSize();
1111 rc = ZSTD_decompressStream(zds, &out, &in);
1112 zstdThrowIfError(rc);
1118 queue.postallocate(out.pos);
1120 if (in.pos != in.size || !cursor.isAtEnd()) {
1121 throw std::runtime_error("ZSTD: junk after end of data");
1123 if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
1124 queue.chainLength() != uncompressedLength) {
1125 throw std::runtime_error("ZSTD: invalid uncompressed length");
1128 return queue.move();
1131 #endif // FOLLY_HAVE_LIBZSTD
1135 std::unique_ptr<Codec> getCodec(CodecType type, int level) {
1136 typedef std::unique_ptr<Codec> (*CodecFactory)(int, CodecType);
1138 static CodecFactory codecFactories[
1139 static_cast<size_t>(CodecType::NUM_CODEC_TYPES)] = {
1140 nullptr, // USER_DEFINED
1141 NoCompressionCodec::create,
1143 #if FOLLY_HAVE_LIBLZ4
1149 #if FOLLY_HAVE_LIBSNAPPY
1150 SnappyCodec::create,
1161 #if FOLLY_HAVE_LIBLZ4
1167 #if FOLLY_HAVE_LIBLZMA
1175 #if FOLLY_HAVE_LIBZSTD
1188 size_t idx = static_cast<size_t>(type);
1189 if (idx >= static_cast<size_t>(CodecType::NUM_CODEC_TYPES)) {
1190 throw std::invalid_argument(to<std::string>(
1191 "Compression type ", idx, " not supported"));
1193 auto factory = codecFactories[idx];
1195 throw std::invalid_argument(to<std::string>(
1196 "Compression type ", idx, " not supported"));
1198 auto codec = (*factory)(level, type);
1199 DCHECK_EQ(static_cast<size_t>(codec->type()), idx);