2 * Copyright 2016 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #include <folly/io/Compression.h>
24 #include <glog/logging.h>
26 #if FOLLY_HAVE_LIBSNAPPY
28 #include <snappy-sinksource.h>
35 #if FOLLY_HAVE_LIBLZMA
39 #if FOLLY_HAVE_LIBZSTD
43 #include <folly/Conv.h>
44 #include <folly/Memory.h>
45 #include <folly/Portability.h>
46 #include <folly/ScopeGuard.h>
47 #include <folly/Varint.h>
48 #include <folly/io/Cursor.h>
50 namespace folly { namespace io {
52 Codec::Codec(CodecType type) : type_(type) { }
54 // Ensure consistent behavior in the nullptr case
55 std::unique_ptr<IOBuf> Codec::compress(const IOBuf* data) {
56 uint64_t len = data->computeChainDataLength();
58 return IOBuf::create(0);
59 } else if (len > maxUncompressedLength()) {
60 throw std::runtime_error("Codec: uncompressed length too large");
63 return doCompress(data);
66 std::unique_ptr<IOBuf> Codec::uncompress(const IOBuf* data,
67 uint64_t uncompressedLength) {
68 if (uncompressedLength == UNKNOWN_UNCOMPRESSED_LENGTH) {
69 if (needsUncompressedLength()) {
70 throw std::invalid_argument("Codec: uncompressed length required");
72 } else if (uncompressedLength > maxUncompressedLength()) {
73 throw std::runtime_error("Codec: uncompressed length too large");
77 if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
78 uncompressedLength != 0) {
79 throw std::runtime_error("Codec: invalid uncompressed length");
81 return IOBuf::create(0);
84 return doUncompress(data, uncompressedLength);
87 bool Codec::needsUncompressedLength() const {
88 return doNeedsUncompressedLength();
91 uint64_t Codec::maxUncompressedLength() const {
92 return doMaxUncompressedLength();
95 bool Codec::doNeedsUncompressedLength() const {
99 uint64_t Codec::doMaxUncompressedLength() const {
100 return UNLIMITED_UNCOMPRESSED_LENGTH;
108 class NoCompressionCodec final : public Codec {
110 static std::unique_ptr<Codec> create(int level, CodecType type);
111 explicit NoCompressionCodec(int level, CodecType type);
114 std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
115 std::unique_ptr<IOBuf> doUncompress(
117 uint64_t uncompressedLength) override;
120 std::unique_ptr<Codec> NoCompressionCodec::create(int level, CodecType type) {
121 return make_unique<NoCompressionCodec>(level, type);
124 NoCompressionCodec::NoCompressionCodec(int level, CodecType type)
126 DCHECK(type == CodecType::NO_COMPRESSION);
128 case COMPRESSION_LEVEL_DEFAULT:
129 case COMPRESSION_LEVEL_FASTEST:
130 case COMPRESSION_LEVEL_BEST:
134 throw std::invalid_argument(to<std::string>(
135 "NoCompressionCodec: invalid level ", level));
139 std::unique_ptr<IOBuf> NoCompressionCodec::doCompress(
141 return data->clone();
144 std::unique_ptr<IOBuf> NoCompressionCodec::doUncompress(
146 uint64_t uncompressedLength) {
147 if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
148 data->computeChainDataLength() != uncompressedLength) {
149 throw std::runtime_error(to<std::string>(
150 "NoCompressionCodec: invalid uncompressed length"));
152 return data->clone();
155 #if (FOLLY_HAVE_LIBLZ4 || FOLLY_HAVE_LIBLZMA)
159 void encodeVarintToIOBuf(uint64_t val, folly::IOBuf* out) {
160 DCHECK_GE(out->tailroom(), kMaxVarintLength64);
161 out->append(encodeVarint(val, out->writableTail()));
164 inline uint64_t decodeVarintFromCursor(folly::io::Cursor& cursor) {
167 for (int shift = 0; shift <= 63; shift += 7) {
168 b = cursor.read<int8_t>();
169 val |= static_cast<uint64_t>(b & 0x7f) << shift;
175 throw std::invalid_argument("Invalid varint value. Too big.");
182 #endif // FOLLY_HAVE_LIBLZ4 || FOLLY_HAVE_LIBLZMA
184 #if FOLLY_HAVE_LIBLZ4
189 class LZ4Codec final : public Codec {
191 static std::unique_ptr<Codec> create(int level, CodecType type);
192 explicit LZ4Codec(int level, CodecType type);
195 bool doNeedsUncompressedLength() const override;
196 uint64_t doMaxUncompressedLength() const override;
198 bool encodeSize() const { return type() == CodecType::LZ4_VARINT_SIZE; }
200 std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
201 std::unique_ptr<IOBuf> doUncompress(
203 uint64_t uncompressedLength) override;
205 bool highCompression_;
208 std::unique_ptr<Codec> LZ4Codec::create(int level, CodecType type) {
209 return make_unique<LZ4Codec>(level, type);
212 LZ4Codec::LZ4Codec(int level, CodecType type) : Codec(type) {
213 DCHECK(type == CodecType::LZ4 || type == CodecType::LZ4_VARINT_SIZE);
216 case COMPRESSION_LEVEL_FASTEST:
217 case COMPRESSION_LEVEL_DEFAULT:
220 case COMPRESSION_LEVEL_BEST:
224 if (level < 1 || level > 2) {
225 throw std::invalid_argument(to<std::string>(
226 "LZ4Codec: invalid level: ", level));
228 highCompression_ = (level > 1);
231 bool LZ4Codec::doNeedsUncompressedLength() const {
232 return !encodeSize();
235 // The value comes from lz4.h in lz4-r117, but older versions of lz4 don't
236 // define LZ4_MAX_INPUT_SIZE (even though the max size is the same), so do it
238 #ifndef LZ4_MAX_INPUT_SIZE
239 # define LZ4_MAX_INPUT_SIZE 0x7E000000
242 uint64_t LZ4Codec::doMaxUncompressedLength() const {
243 return LZ4_MAX_INPUT_SIZE;
246 std::unique_ptr<IOBuf> LZ4Codec::doCompress(const IOBuf* data) {
247 std::unique_ptr<IOBuf> clone;
248 if (data->isChained()) {
249 // LZ4 doesn't support streaming, so we have to coalesce
250 clone = data->clone();
255 uint32_t extraSize = encodeSize() ? kMaxVarintLength64 : 0;
256 auto out = IOBuf::create(extraSize + LZ4_compressBound(data->length()));
258 encodeVarintToIOBuf(data->length(), out.get());
262 if (highCompression_) {
263 n = LZ4_compressHC(reinterpret_cast<const char*>(data->data()),
264 reinterpret_cast<char*>(out->writableTail()),
267 n = LZ4_compress(reinterpret_cast<const char*>(data->data()),
268 reinterpret_cast<char*>(out->writableTail()),
273 CHECK_LE(n, out->capacity());
279 std::unique_ptr<IOBuf> LZ4Codec::doUncompress(
281 uint64_t uncompressedLength) {
282 std::unique_ptr<IOBuf> clone;
283 if (data->isChained()) {
284 // LZ4 doesn't support streaming, so we have to coalesce
285 clone = data->clone();
290 folly::io::Cursor cursor(data);
291 uint64_t actualUncompressedLength;
293 actualUncompressedLength = decodeVarintFromCursor(cursor);
294 if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
295 uncompressedLength != actualUncompressedLength) {
296 throw std::runtime_error("LZ4Codec: invalid uncompressed length");
299 actualUncompressedLength = uncompressedLength;
300 if (actualUncompressedLength == UNKNOWN_UNCOMPRESSED_LENGTH ||
301 actualUncompressedLength > maxUncompressedLength()) {
302 throw std::runtime_error("LZ4Codec: invalid uncompressed length");
306 auto p = cursor.peek();
307 auto out = IOBuf::create(actualUncompressedLength);
308 int n = LZ4_decompress_safe(reinterpret_cast<const char*>(p.first),
309 reinterpret_cast<char*>(out->writableTail()),
311 actualUncompressedLength);
313 if (n < 0 || uint64_t(n) != actualUncompressedLength) {
314 throw std::runtime_error(to<std::string>(
315 "LZ4 decompression returned invalid value ", n));
317 out->append(actualUncompressedLength);
321 #endif // FOLLY_HAVE_LIBLZ4
323 #if FOLLY_HAVE_LIBSNAPPY
330 * Implementation of snappy::Source that reads from a IOBuf chain.
332 class IOBufSnappySource final : public snappy::Source {
334 explicit IOBufSnappySource(const IOBuf* data);
335 size_t Available() const override;
336 const char* Peek(size_t* len) override;
337 void Skip(size_t n) override;
343 IOBufSnappySource::IOBufSnappySource(const IOBuf* data)
344 : available_(data->computeChainDataLength()),
348 size_t IOBufSnappySource::Available() const {
352 const char* IOBufSnappySource::Peek(size_t* len) {
353 auto p = cursor_.peek();
355 return reinterpret_cast<const char*>(p.first);
358 void IOBufSnappySource::Skip(size_t n) {
359 CHECK_LE(n, available_);
364 class SnappyCodec final : public Codec {
366 static std::unique_ptr<Codec> create(int level, CodecType type);
367 explicit SnappyCodec(int level, CodecType type);
370 uint64_t doMaxUncompressedLength() const override;
371 std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
372 std::unique_ptr<IOBuf> doUncompress(
374 uint64_t uncompressedLength) override;
377 std::unique_ptr<Codec> SnappyCodec::create(int level, CodecType type) {
378 return make_unique<SnappyCodec>(level, type);
381 SnappyCodec::SnappyCodec(int level, CodecType type) : Codec(type) {
382 DCHECK(type == CodecType::SNAPPY);
384 case COMPRESSION_LEVEL_FASTEST:
385 case COMPRESSION_LEVEL_DEFAULT:
386 case COMPRESSION_LEVEL_BEST:
390 throw std::invalid_argument(to<std::string>(
391 "SnappyCodec: invalid level: ", level));
395 uint64_t SnappyCodec::doMaxUncompressedLength() const {
396 // snappy.h uses uint32_t for lengths, so there's that.
397 return std::numeric_limits<uint32_t>::max();
400 std::unique_ptr<IOBuf> SnappyCodec::doCompress(const IOBuf* data) {
401 IOBufSnappySource source(data);
403 IOBuf::create(snappy::MaxCompressedLength(source.Available()));
405 snappy::UncheckedByteArraySink sink(reinterpret_cast<char*>(
406 out->writableTail()));
408 size_t n = snappy::Compress(&source, &sink);
410 CHECK_LE(n, out->capacity());
415 std::unique_ptr<IOBuf> SnappyCodec::doUncompress(const IOBuf* data,
416 uint64_t uncompressedLength) {
417 uint32_t actualUncompressedLength = 0;
420 IOBufSnappySource source(data);
421 if (!snappy::GetUncompressedLength(&source, &actualUncompressedLength)) {
422 throw std::runtime_error("snappy::GetUncompressedLength failed");
424 if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
425 uncompressedLength != actualUncompressedLength) {
426 throw std::runtime_error("snappy: invalid uncompressed length");
430 auto out = IOBuf::create(actualUncompressedLength);
433 IOBufSnappySource source(data);
434 if (!snappy::RawUncompress(&source,
435 reinterpret_cast<char*>(out->writableTail()))) {
436 throw std::runtime_error("snappy::RawUncompress failed");
440 out->append(actualUncompressedLength);
444 #endif // FOLLY_HAVE_LIBSNAPPY
450 class ZlibCodec final : public Codec {
452 static std::unique_ptr<Codec> create(int level, CodecType type);
453 explicit ZlibCodec(int level, CodecType type);
456 std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
457 std::unique_ptr<IOBuf> doUncompress(
459 uint64_t uncompressedLength) override;
461 std::unique_ptr<IOBuf> addOutputBuffer(z_stream* stream, uint32_t length);
462 bool doInflate(z_stream* stream, IOBuf* head, uint32_t bufferLength);
467 std::unique_ptr<Codec> ZlibCodec::create(int level, CodecType type) {
468 return make_unique<ZlibCodec>(level, type);
471 ZlibCodec::ZlibCodec(int level, CodecType type) : Codec(type) {
472 DCHECK(type == CodecType::ZLIB || type == CodecType::GZIP);
474 case COMPRESSION_LEVEL_FASTEST:
477 case COMPRESSION_LEVEL_DEFAULT:
478 level = Z_DEFAULT_COMPRESSION;
480 case COMPRESSION_LEVEL_BEST:
484 if (level != Z_DEFAULT_COMPRESSION && (level < 0 || level > 9)) {
485 throw std::invalid_argument(to<std::string>(
486 "ZlibCodec: invalid level: ", level));
491 std::unique_ptr<IOBuf> ZlibCodec::addOutputBuffer(z_stream* stream,
493 CHECK_EQ(stream->avail_out, 0);
495 auto buf = IOBuf::create(length);
498 stream->next_out = buf->writableData();
499 stream->avail_out = buf->length();
504 bool ZlibCodec::doInflate(z_stream* stream,
506 uint32_t bufferLength) {
507 if (stream->avail_out == 0) {
508 head->prependChain(addOutputBuffer(stream, bufferLength));
511 int rc = inflate(stream, Z_NO_FLUSH);
522 throw std::runtime_error(to<std::string>(
523 "ZlibCodec: inflate error: ", rc, ": ", stream->msg));
525 CHECK(false) << rc << ": " << stream->msg;
531 std::unique_ptr<IOBuf> ZlibCodec::doCompress(const IOBuf* data) {
533 stream.zalloc = nullptr;
534 stream.zfree = nullptr;
535 stream.opaque = nullptr;
537 // Using deflateInit2() to support gzip. "The windowBits parameter is the
538 // base two logarithm of the maximum window size (...) The default value is
539 // 15 (...) Add 16 to windowBits to write a simple gzip header and trailer
540 // around the compressed data instead of a zlib wrapper. The gzip header
541 // will have no file name, no extra data, no comment, no modification time
542 // (set to zero), no header crc, and the operating system will be set to 255
544 int windowBits = 15 + (type() == CodecType::GZIP ? 16 : 0);
545 // All other parameters (method, memLevel, strategy) get default values from
547 int rc = deflateInit2(&stream,
554 throw std::runtime_error(to<std::string>(
555 "ZlibCodec: deflateInit error: ", rc, ": ", stream.msg));
558 stream.next_in = stream.next_out = nullptr;
559 stream.avail_in = stream.avail_out = 0;
560 stream.total_in = stream.total_out = 0;
562 bool success = false;
565 int rc = deflateEnd(&stream);
566 // If we're here because of an exception, it's okay if some data
568 CHECK(rc == Z_OK || (!success && rc == Z_DATA_ERROR))
569 << rc << ": " << stream.msg;
572 uint64_t uncompressedLength = data->computeChainDataLength();
573 uint64_t maxCompressedLength = deflateBound(&stream, uncompressedLength);
575 // Max 64MiB in one go
576 constexpr uint32_t maxSingleStepLength = uint32_t(64) << 20; // 64MiB
577 constexpr uint32_t defaultBufferLength = uint32_t(4) << 20; // 4MiB
579 auto out = addOutputBuffer(
581 (maxCompressedLength <= maxSingleStepLength ?
582 maxCompressedLength :
583 defaultBufferLength));
585 for (auto& range : *data) {
586 uint64_t remaining = range.size();
587 uint64_t written = 0;
589 uint32_t step = (remaining > maxSingleStepLength ?
590 maxSingleStepLength : remaining);
591 stream.next_in = const_cast<uint8_t*>(range.data() + written);
592 stream.avail_in = step;
596 while (stream.avail_in != 0) {
597 if (stream.avail_out == 0) {
598 out->prependChain(addOutputBuffer(&stream, defaultBufferLength));
601 rc = deflate(&stream, Z_NO_FLUSH);
603 CHECK_EQ(rc, Z_OK) << stream.msg;
609 if (stream.avail_out == 0) {
610 out->prependChain(addOutputBuffer(&stream, defaultBufferLength));
613 rc = deflate(&stream, Z_FINISH);
614 } while (rc == Z_OK);
616 CHECK_EQ(rc, Z_STREAM_END) << stream.msg;
618 out->prev()->trimEnd(stream.avail_out);
620 success = true; // we survived
625 std::unique_ptr<IOBuf> ZlibCodec::doUncompress(const IOBuf* data,
626 uint64_t uncompressedLength) {
628 stream.zalloc = nullptr;
629 stream.zfree = nullptr;
630 stream.opaque = nullptr;
632 // "The windowBits parameter is the base two logarithm of the maximum window
633 // size (...) The default value is 15 (...) add 16 to decode only the gzip
634 // format (the zlib format will return a Z_DATA_ERROR)."
635 int windowBits = 15 + (type() == CodecType::GZIP ? 16 : 0);
636 int rc = inflateInit2(&stream, windowBits);
638 throw std::runtime_error(to<std::string>(
639 "ZlibCodec: inflateInit error: ", rc, ": ", stream.msg));
642 stream.next_in = stream.next_out = nullptr;
643 stream.avail_in = stream.avail_out = 0;
644 stream.total_in = stream.total_out = 0;
646 bool success = false;
649 int rc = inflateEnd(&stream);
650 // If we're here because of an exception, it's okay if some data
652 CHECK(rc == Z_OK || (!success && rc == Z_DATA_ERROR))
653 << rc << ": " << stream.msg;
656 // Max 64MiB in one go
657 constexpr uint32_t maxSingleStepLength = uint32_t(64) << 20; // 64MiB
658 constexpr uint32_t defaultBufferLength = uint32_t(4) << 20; // 4MiB
660 auto out = addOutputBuffer(
662 ((uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
663 uncompressedLength <= maxSingleStepLength) ?
665 defaultBufferLength));
667 bool streamEnd = false;
668 for (auto& range : *data) {
673 stream.next_in = const_cast<uint8_t*>(range.data());
674 stream.avail_in = range.size();
676 while (stream.avail_in != 0) {
678 throw std::runtime_error(to<std::string>(
679 "ZlibCodec: junk after end of data"));
682 streamEnd = doInflate(&stream, out.get(), defaultBufferLength);
687 streamEnd = doInflate(&stream, out.get(), defaultBufferLength);
690 out->prev()->trimEnd(stream.avail_out);
692 if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
693 uncompressedLength != stream.total_out) {
694 throw std::runtime_error(to<std::string>(
695 "ZlibCodec: invalid uncompressed length"));
698 success = true; // we survived
703 #endif // FOLLY_HAVE_LIBZ
705 #if FOLLY_HAVE_LIBLZMA
710 class LZMA2Codec final : public Codec {
712 static std::unique_ptr<Codec> create(int level, CodecType type);
713 explicit LZMA2Codec(int level, CodecType type);
716 bool doNeedsUncompressedLength() const override;
717 uint64_t doMaxUncompressedLength() const override;
719 bool encodeSize() const { return type() == CodecType::LZMA2_VARINT_SIZE; }
721 std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
722 std::unique_ptr<IOBuf> doUncompress(
724 uint64_t uncompressedLength) override;
726 std::unique_ptr<IOBuf> addOutputBuffer(lzma_stream* stream, size_t length);
727 bool doInflate(lzma_stream* stream, IOBuf* head, size_t bufferLength);
732 std::unique_ptr<Codec> LZMA2Codec::create(int level, CodecType type) {
733 return make_unique<LZMA2Codec>(level, type);
736 LZMA2Codec::LZMA2Codec(int level, CodecType type) : Codec(type) {
737 DCHECK(type == CodecType::LZMA2 || type == CodecType::LZMA2_VARINT_SIZE);
739 case COMPRESSION_LEVEL_FASTEST:
742 case COMPRESSION_LEVEL_DEFAULT:
743 level = LZMA_PRESET_DEFAULT;
745 case COMPRESSION_LEVEL_BEST:
749 if (level < 0 || level > 9) {
750 throw std::invalid_argument(to<std::string>(
751 "LZMA2Codec: invalid level: ", level));
756 bool LZMA2Codec::doNeedsUncompressedLength() const {
757 return !encodeSize();
760 uint64_t LZMA2Codec::doMaxUncompressedLength() const {
761 // From lzma/base.h: "Stream is roughly 8 EiB (2^63 bytes)"
762 return uint64_t(1) << 63;
765 std::unique_ptr<IOBuf> LZMA2Codec::addOutputBuffer(
769 CHECK_EQ(stream->avail_out, 0);
771 auto buf = IOBuf::create(length);
774 stream->next_out = buf->writableData();
775 stream->avail_out = buf->length();
780 std::unique_ptr<IOBuf> LZMA2Codec::doCompress(const IOBuf* data) {
782 lzma_stream stream = LZMA_STREAM_INIT;
784 rc = lzma_easy_encoder(&stream, level_, LZMA_CHECK_NONE);
786 throw std::runtime_error(folly::to<std::string>(
787 "LZMA2Codec: lzma_easy_encoder error: ", rc));
790 SCOPE_EXIT { lzma_end(&stream); };
792 uint64_t uncompressedLength = data->computeChainDataLength();
793 uint64_t maxCompressedLength = lzma_stream_buffer_bound(uncompressedLength);
795 // Max 64MiB in one go
796 constexpr uint32_t maxSingleStepLength = uint32_t(64) << 20; // 64MiB
797 constexpr uint32_t defaultBufferLength = uint32_t(4) << 20; // 4MiB
799 auto out = addOutputBuffer(
801 (maxCompressedLength <= maxSingleStepLength ?
802 maxCompressedLength :
803 defaultBufferLength));
806 auto size = IOBuf::createCombined(kMaxVarintLength64);
807 encodeVarintToIOBuf(uncompressedLength, size.get());
808 size->appendChain(std::move(out));
809 out = std::move(size);
812 for (auto& range : *data) {
817 stream.next_in = const_cast<uint8_t*>(range.data());
818 stream.avail_in = range.size();
820 while (stream.avail_in != 0) {
821 if (stream.avail_out == 0) {
822 out->prependChain(addOutputBuffer(&stream, defaultBufferLength));
825 rc = lzma_code(&stream, LZMA_RUN);
828 throw std::runtime_error(folly::to<std::string>(
829 "LZMA2Codec: lzma_code error: ", rc));
835 if (stream.avail_out == 0) {
836 out->prependChain(addOutputBuffer(&stream, defaultBufferLength));
839 rc = lzma_code(&stream, LZMA_FINISH);
840 } while (rc == LZMA_OK);
842 if (rc != LZMA_STREAM_END) {
843 throw std::runtime_error(folly::to<std::string>(
844 "LZMA2Codec: lzma_code ended with error: ", rc));
847 out->prev()->trimEnd(stream.avail_out);
852 bool LZMA2Codec::doInflate(lzma_stream* stream,
854 size_t bufferLength) {
855 if (stream->avail_out == 0) {
856 head->prependChain(addOutputBuffer(stream, bufferLength));
859 lzma_ret rc = lzma_code(stream, LZMA_RUN);
864 case LZMA_STREAM_END:
867 throw std::runtime_error(to<std::string>(
868 "LZMA2Codec: lzma_code error: ", rc));
874 std::unique_ptr<IOBuf> LZMA2Codec::doUncompress(const IOBuf* data,
875 uint64_t uncompressedLength) {
877 lzma_stream stream = LZMA_STREAM_INIT;
879 rc = lzma_auto_decoder(&stream, std::numeric_limits<uint64_t>::max(), 0);
881 throw std::runtime_error(folly::to<std::string>(
882 "LZMA2Codec: lzma_auto_decoder error: ", rc));
885 SCOPE_EXIT { lzma_end(&stream); };
887 // Max 64MiB in one go
888 constexpr uint32_t maxSingleStepLength = uint32_t(64) << 20; // 64MiB
889 constexpr uint32_t defaultBufferLength = uint32_t(4) << 20; // 4MiB
891 folly::io::Cursor cursor(data);
892 uint64_t actualUncompressedLength;
894 actualUncompressedLength = decodeVarintFromCursor(cursor);
895 if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
896 uncompressedLength != actualUncompressedLength) {
897 throw std::runtime_error("LZMA2Codec: invalid uncompressed length");
900 actualUncompressedLength = uncompressedLength;
901 DCHECK_NE(actualUncompressedLength, UNKNOWN_UNCOMPRESSED_LENGTH);
904 auto out = addOutputBuffer(
906 (actualUncompressedLength <= maxSingleStepLength ?
907 actualUncompressedLength :
908 defaultBufferLength));
910 bool streamEnd = false;
911 auto buf = cursor.peek();
912 while (buf.second != 0) {
913 stream.next_in = const_cast<uint8_t*>(buf.first);
914 stream.avail_in = buf.second;
916 while (stream.avail_in != 0) {
918 throw std::runtime_error(to<std::string>(
919 "LZMA2Codec: junk after end of data"));
922 streamEnd = doInflate(&stream, out.get(), defaultBufferLength);
925 cursor.skip(buf.second);
930 streamEnd = doInflate(&stream, out.get(), defaultBufferLength);
933 out->prev()->trimEnd(stream.avail_out);
935 if (actualUncompressedLength != stream.total_out) {
936 throw std::runtime_error(to<std::string>(
937 "LZMA2Codec: invalid uncompressed length"));
943 #endif // FOLLY_HAVE_LIBLZMA
945 #ifdef FOLLY_HAVE_LIBZSTD
948 * ZSTD_BETA compression
950 class ZSTDCodec final : public Codec {
952 static std::unique_ptr<Codec> create(int level, CodecType);
953 explicit ZSTDCodec(int level, CodecType type);
956 bool doNeedsUncompressedLength() const override;
957 std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
958 std::unique_ptr<IOBuf> doUncompress(
960 uint64_t uncompressedLength) override;
965 std::unique_ptr<Codec> ZSTDCodec::create(int level, CodecType type) {
966 return make_unique<ZSTDCodec>(level, type);
969 ZSTDCodec::ZSTDCodec(int level, CodecType type) : Codec(type) {
970 DCHECK(type == CodecType::ZSTD_BETA);
972 case COMPRESSION_LEVEL_FASTEST:
975 case COMPRESSION_LEVEL_DEFAULT:
978 case COMPRESSION_LEVEL_BEST:
984 bool ZSTDCodec::doNeedsUncompressedLength() const {
988 std::unique_ptr<IOBuf> ZSTDCodec::doCompress(const IOBuf* data) {
990 size_t maxCompressedLength = ZSTD_compressBound(data->length());
991 auto out = IOBuf::createCombined(maxCompressedLength);
993 CHECK_EQ(out->length(), 0);
995 rc = ZSTD_compress(out->writableTail(),
1001 if (ZSTD_isError(rc)) {
1002 throw std::runtime_error(to<std::string>(
1003 "ZSTD compression returned an error: ",
1004 ZSTD_getErrorName(rc)));
1008 CHECK_EQ(out->length(), rc);
1013 std::unique_ptr<IOBuf> ZSTDCodec::doUncompress(const IOBuf* data,
1014 uint64_t uncompressedLength) {
1016 auto out = IOBuf::createCombined(uncompressedLength);
1018 CHECK_GE(out->capacity(), uncompressedLength);
1019 CHECK_EQ(out->length(), 0);
1021 rc = ZSTD_decompress(
1022 out->writableTail(), out->capacity(), data->data(), data->length());
1024 if (ZSTD_isError(rc)) {
1025 throw std::runtime_error(to<std::string>(
1026 "ZSTD decompression returned an error: ",
1027 ZSTD_getErrorName(rc)));
1031 CHECK_EQ(out->length(), rc);
1036 #endif // FOLLY_HAVE_LIBZSTD
1040 std::unique_ptr<Codec> getCodec(CodecType type, int level) {
1041 typedef std::unique_ptr<Codec> (*CodecFactory)(int, CodecType);
1043 static CodecFactory codecFactories[
1044 static_cast<size_t>(CodecType::NUM_CODEC_TYPES)] = {
1045 nullptr, // USER_DEFINED
1046 NoCompressionCodec::create,
1048 #if FOLLY_HAVE_LIBLZ4
1054 #if FOLLY_HAVE_LIBSNAPPY
1055 SnappyCodec::create,
1066 #if FOLLY_HAVE_LIBLZ4
1072 #if FOLLY_HAVE_LIBLZMA
1080 #if FOLLY_HAVE_LIBZSTD
1093 size_t idx = static_cast<size_t>(type);
1094 if (idx >= static_cast<size_t>(CodecType::NUM_CODEC_TYPES)) {
1095 throw std::invalid_argument(to<std::string>(
1096 "Compression type ", idx, " not supported"));
1098 auto factory = codecFactories[idx];
1100 throw std::invalid_argument(to<std::string>(
1101 "Compression type ", idx, " not supported"));
1103 auto codec = (*factory)(level, type);
1104 DCHECK_EQ(static_cast<size_t>(codec->type()), idx);