2 * Copyright 2017 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #include <folly/io/Compression.h>
23 #include <unordered_map>
26 #include <boost/noncopyable.hpp>
27 #include <glog/logging.h>
29 #include <folly/Benchmark.h>
30 #include <folly/Hash.h>
31 #include <folly/Memory.h>
32 #include <folly/Random.h>
33 #include <folly/Varint.h>
34 #include <folly/io/IOBufQueue.h>
35 #include <folly/portability/GTest.h>
37 #if FOLLY_HAVE_LIBZSTD
42 #include <folly/io/compression/Zlib.h>
45 namespace zlib = folly::io::zlib;
51 class DataHolder : private boost::noncopyable {
53 uint64_t hash(size_t size) const;
54 ByteRange data(size_t size) const;
57 explicit DataHolder(size_t sizeLog2);
59 std::unique_ptr<uint8_t[]> data_;
60 mutable std::unordered_map<uint64_t, uint64_t> hashCache_;
63 DataHolder::DataHolder(size_t sizeLog2)
64 : size_(size_t(1) << sizeLog2),
65 data_(new uint8_t[size_]) {
68 uint64_t DataHolder::hash(size_t size) const {
69 CHECK_LE(size, size_);
70 auto p = hashCache_.find(size);
71 if (p != hashCache_.end()) {
75 uint64_t h = folly::hash::fnv64_buf(data_.get(), size);
80 ByteRange DataHolder::data(size_t size) const {
81 CHECK_LE(size, size_);
82 return ByteRange(data_.get(), size);
85 uint64_t hashIOBuf(const IOBuf* buf) {
86 uint64_t h = folly::hash::FNV_64_HASH_START;
87 for (auto& range : *buf) {
88 h = folly::hash::fnv64_buf(range.data(), range.size(), h);
93 class RandomDataHolder : public DataHolder {
95 explicit RandomDataHolder(size_t sizeLog2);
98 RandomDataHolder::RandomDataHolder(size_t sizeLog2)
99 : DataHolder(sizeLog2) {
100 static constexpr size_t numThreadsLog2 = 3;
101 static constexpr size_t numThreads = size_t(1) << numThreadsLog2;
103 uint32_t seed = randomNumberSeed();
105 std::vector<std::thread> threads;
106 threads.reserve(numThreads);
107 for (size_t t = 0; t < numThreads; ++t) {
108 threads.emplace_back([this, seed, t, sizeLog2] {
109 std::mt19937 rng(seed + t);
110 size_t countLog2 = sizeLog2 - numThreadsLog2;
111 size_t start = size_t(t) << countLog2;
112 for (size_t i = 0; i < countLog2; ++i) {
113 this->data_[start + i] = rng();
118 for (auto& t : threads) {
123 class ConstantDataHolder : public DataHolder {
125 explicit ConstantDataHolder(size_t sizeLog2);
128 ConstantDataHolder::ConstantDataHolder(size_t sizeLog2)
129 : DataHolder(sizeLog2) {
130 memset(data_.get(), 'a', size_);
133 constexpr size_t dataSizeLog2 = 27; // 128MiB
134 RandomDataHolder randomDataHolder(dataSizeLog2);
135 ConstantDataHolder constantDataHolder(dataSizeLog2);
137 // The intersection of the provided codecs & those that are compiled in.
138 static std::vector<CodecType> supportedCodecs(std::vector<CodecType> const& v) {
139 std::vector<CodecType> supported;
144 std::back_inserter(supported),
150 // All compiled-in compression codecs.
151 static std::vector<CodecType> availableCodecs() {
152 std::vector<CodecType> codecs;
154 for (size_t i = 0; i < static_cast<size_t>(CodecType::NUM_CODEC_TYPES); ++i) {
155 auto type = static_cast<CodecType>(i);
156 if (hasCodec(type)) {
157 codecs.push_back(type);
164 static std::vector<CodecType> availableStreamCodecs() {
165 std::vector<CodecType> codecs;
167 for (size_t i = 0; i < static_cast<size_t>(CodecType::NUM_CODEC_TYPES); ++i) {
168 auto type = static_cast<CodecType>(i);
169 if (hasStreamCodec(type)) {
170 codecs.push_back(type);
177 TEST(CompressionTestNeedsUncompressedLength, Simple) {
178 static const struct {
180 bool needsUncompressedLength;
182 {CodecType::NO_COMPRESSION, false},
183 {CodecType::LZ4, true},
184 {CodecType::SNAPPY, false},
185 {CodecType::ZLIB, false},
186 {CodecType::LZ4_VARINT_SIZE, false},
187 {CodecType::LZMA2, false},
188 {CodecType::LZMA2_VARINT_SIZE, false},
189 {CodecType::ZSTD, false},
190 {CodecType::GZIP, false},
191 {CodecType::LZ4_FRAME, false},
192 {CodecType::BZIP2, false},
195 for (auto const& test : expectations) {
196 if (hasCodec(test.type)) {
197 EXPECT_EQ(getCodec(test.type)->needsUncompressedLength(),
198 test.needsUncompressedLength);
203 class CompressionTest
204 : public testing::TestWithParam<std::tr1::tuple<int, int, CodecType>> {
206 void SetUp() override {
207 auto tup = GetParam();
208 uncompressedLength_ = uint64_t(1) << std::tr1::get<0>(tup);
209 chunks_ = std::tr1::get<1>(tup);
210 codec_ = getCodec(std::tr1::get<2>(tup));
213 void runSimpleIOBufTest(const DataHolder& dh);
215 void runSimpleStringTest(const DataHolder& dh);
218 std::unique_ptr<IOBuf> split(std::unique_ptr<IOBuf> data) const;
220 uint64_t uncompressedLength_;
222 std::unique_ptr<Codec> codec_;
225 void CompressionTest::runSimpleIOBufTest(const DataHolder& dh) {
226 const auto original = split(IOBuf::wrapBuffer(dh.data(uncompressedLength_)));
227 const auto compressed = split(codec_->compress(original.get()));
228 if (!codec_->needsUncompressedLength()) {
229 auto uncompressed = codec_->uncompress(compressed.get());
230 EXPECT_EQ(uncompressedLength_, uncompressed->computeChainDataLength());
231 EXPECT_EQ(dh.hash(uncompressedLength_), hashIOBuf(uncompressed.get()));
234 auto uncompressed = codec_->uncompress(compressed.get(),
235 uncompressedLength_);
236 EXPECT_EQ(uncompressedLength_, uncompressed->computeChainDataLength());
237 EXPECT_EQ(dh.hash(uncompressedLength_), hashIOBuf(uncompressed.get()));
241 void CompressionTest::runSimpleStringTest(const DataHolder& dh) {
242 const auto original = std::string(
243 reinterpret_cast<const char*>(dh.data(uncompressedLength_).data()),
244 uncompressedLength_);
245 const auto compressed = codec_->compress(original);
246 if (!codec_->needsUncompressedLength()) {
247 auto uncompressed = codec_->uncompress(compressed);
248 EXPECT_EQ(uncompressedLength_, uncompressed.length());
249 EXPECT_EQ(uncompressed, original);
252 auto uncompressed = codec_->uncompress(compressed, uncompressedLength_);
253 EXPECT_EQ(uncompressedLength_, uncompressed.length());
254 EXPECT_EQ(uncompressed, original);
258 // Uniformly split data into (potentially empty) chunks.
259 std::unique_ptr<IOBuf> CompressionTest::split(
260 std::unique_ptr<IOBuf> data) const {
261 if (data->isChained()) {
265 const size_t size = data->computeChainDataLength();
267 std::multiset<size_t> splits;
268 for (size_t i = 1; i < chunks_; ++i) {
269 splits.insert(Random::rand64(size));
272 folly::IOBufQueue result;
275 for (size_t split : splits) {
276 result.append(IOBuf::copyBuffer(data->data() + offset, split - offset));
279 result.append(IOBuf::copyBuffer(data->data() + offset, size - offset));
281 return result.move();
284 TEST_P(CompressionTest, RandomData) {
285 runSimpleIOBufTest(randomDataHolder);
288 TEST_P(CompressionTest, ConstantData) {
289 runSimpleIOBufTest(constantDataHolder);
292 TEST_P(CompressionTest, RandomDataString) {
293 runSimpleStringTest(randomDataHolder);
296 TEST_P(CompressionTest, ConstantDataString) {
297 runSimpleStringTest(constantDataHolder);
300 INSTANTIATE_TEST_CASE_P(
304 testing::Values(0, 1, 12, 22, 25, 27),
305 testing::Values(1, 2, 3, 8, 65),
306 testing::ValuesIn(availableCodecs())));
308 class CompressionVarintTest
309 : public testing::TestWithParam<std::tr1::tuple<int, CodecType>> {
311 void SetUp() override {
312 auto tup = GetParam();
313 uncompressedLength_ = uint64_t(1) << std::tr1::get<0>(tup);
314 codec_ = getCodec(std::tr1::get<1>(tup));
317 void runSimpleTest(const DataHolder& dh);
319 uint64_t uncompressedLength_;
320 std::unique_ptr<Codec> codec_;
323 inline uint64_t oneBasedMsbPos(uint64_t number) {
325 for (; number > 0; ++pos, number >>= 1) {
330 void CompressionVarintTest::runSimpleTest(const DataHolder& dh) {
331 auto original = IOBuf::wrapBuffer(dh.data(uncompressedLength_));
332 auto compressed = codec_->compress(original.get());
336 std::max(uint64_t(9), oneBasedMsbPos(uncompressedLength_)) / 9UL);
337 auto tinyBuf = IOBuf::copyBuffer(compressed->data(),
338 std::min(compressed->length(), breakPoint));
339 compressed->trimStart(breakPoint);
340 tinyBuf->prependChain(std::move(compressed));
341 compressed = std::move(tinyBuf);
343 auto uncompressed = codec_->uncompress(compressed.get());
345 EXPECT_EQ(uncompressedLength_, uncompressed->computeChainDataLength());
346 EXPECT_EQ(dh.hash(uncompressedLength_), hashIOBuf(uncompressed.get()));
349 TEST_P(CompressionVarintTest, RandomData) {
350 runSimpleTest(randomDataHolder);
353 TEST_P(CompressionVarintTest, ConstantData) {
354 runSimpleTest(constantDataHolder);
357 INSTANTIATE_TEST_CASE_P(
358 CompressionVarintTest,
359 CompressionVarintTest,
361 testing::Values(0, 1, 12, 22, 25, 27),
362 testing::ValuesIn(supportedCodecs({
363 CodecType::LZ4_VARINT_SIZE,
364 CodecType::LZMA2_VARINT_SIZE,
367 TEST(LZMATest, UncompressBadVarint) {
368 if (hasStreamCodec(CodecType::LZMA2_VARINT_SIZE)) {
369 std::string const str(kMaxVarintLength64 * 2, '\xff');
370 ByteRange input((folly::StringPiece(str)));
371 auto codec = getStreamCodec(CodecType::LZMA2_VARINT_SIZE);
372 auto buffer = IOBuf::create(16);
373 buffer->append(buffer->capacity());
374 MutableByteRange output{buffer->writableData(), buffer->length()};
375 EXPECT_THROW(codec->uncompressStream(input, output), std::runtime_error);
379 class CompressionCorruptionTest : public testing::TestWithParam<CodecType> {
381 void SetUp() override { codec_ = getCodec(GetParam()); }
383 void runSimpleTest(const DataHolder& dh);
385 std::unique_ptr<Codec> codec_;
388 void CompressionCorruptionTest::runSimpleTest(const DataHolder& dh) {
389 constexpr uint64_t uncompressedLength = 42;
390 auto original = IOBuf::wrapBuffer(dh.data(uncompressedLength));
391 auto compressed = codec_->compress(original.get());
393 if (!codec_->needsUncompressedLength()) {
394 auto uncompressed = codec_->uncompress(compressed.get());
395 EXPECT_EQ(uncompressedLength, uncompressed->computeChainDataLength());
396 EXPECT_EQ(dh.hash(uncompressedLength), hashIOBuf(uncompressed.get()));
399 auto uncompressed = codec_->uncompress(compressed.get(),
401 EXPECT_EQ(uncompressedLength, uncompressed->computeChainDataLength());
402 EXPECT_EQ(dh.hash(uncompressedLength), hashIOBuf(uncompressed.get()));
405 EXPECT_THROW(codec_->uncompress(compressed.get(), uncompressedLength + 1),
408 auto corrupted = compressed->clone();
409 corrupted->unshare();
410 // Truncate the last character
411 corrupted->prev()->trimEnd(1);
412 if (!codec_->needsUncompressedLength()) {
413 EXPECT_THROW(codec_->uncompress(corrupted.get()),
417 EXPECT_THROW(codec_->uncompress(corrupted.get(), uncompressedLength),
420 corrupted = compressed->clone();
421 corrupted->unshare();
422 // Corrupt the first character
423 ++(corrupted->writableData()[0]);
425 if (!codec_->needsUncompressedLength()) {
426 EXPECT_THROW(codec_->uncompress(corrupted.get()),
430 EXPECT_THROW(codec_->uncompress(corrupted.get(), uncompressedLength),
434 TEST_P(CompressionCorruptionTest, RandomData) {
435 runSimpleTest(randomDataHolder);
438 TEST_P(CompressionCorruptionTest, ConstantData) {
439 runSimpleTest(constantDataHolder);
442 INSTANTIATE_TEST_CASE_P(
443 CompressionCorruptionTest,
444 CompressionCorruptionTest,
446 // NO_COMPRESSION can't detect corruption
447 // LZ4 can't detect corruption reliably (sigh)
453 CodecType::LZ4_FRAME,
457 class StreamingUnitTest : public testing::TestWithParam<CodecType> {
459 void SetUp() override {
460 codec_ = getStreamCodec(GetParam());
463 std::unique_ptr<StreamCodec> codec_;
466 TEST(StreamingUnitTest, needsDataLength) {
467 static const struct {
469 bool needsDataLength;
471 {CodecType::ZLIB, false},
472 {CodecType::GZIP, false},
473 {CodecType::LZMA2, false},
474 {CodecType::LZMA2_VARINT_SIZE, true},
475 {CodecType::ZSTD, false},
478 for (auto const& test : expectations) {
479 if (hasStreamCodec(test.type)) {
481 getStreamCodec(test.type)->needsDataLength(), test.needsDataLength);
486 TEST_P(StreamingUnitTest, maxCompressedLength) {
487 EXPECT_EQ(0, codec_->maxCompressedLength(0));
488 for (uint64_t const length : {1, 10, 100, 1000, 10000, 100000, 1000000}) {
489 EXPECT_GE(codec_->maxCompressedLength(length), length);
493 TEST_P(StreamingUnitTest, getUncompressedLength) {
494 auto const empty = IOBuf::create(0);
495 EXPECT_EQ(uint64_t(0), codec_->getUncompressedLength(empty.get()));
496 EXPECT_EQ(uint64_t(0), codec_->getUncompressedLength(empty.get(), 0));
498 auto const data = IOBuf::wrapBuffer(randomDataHolder.data(100));
499 auto const compressed = codec_->compress(data.get());
501 EXPECT_ANY_THROW(codec_->getUncompressedLength(data.get(), 0));
502 if (auto const length = codec_->getUncompressedLength(data.get())) {
503 EXPECT_EQ(100, *length);
505 EXPECT_EQ(uint64_t(100), codec_->getUncompressedLength(data.get(), 100));
506 // If the uncompressed length is stored in the frame, then make sure it throws
507 // when it is given the wrong length.
508 if (codec_->getUncompressedLength(data.get()) == uint64_t(100)) {
509 EXPECT_ANY_THROW(codec_->getUncompressedLength(data.get(), 200));
513 TEST_P(StreamingUnitTest, emptyData) {
515 auto buffer = IOBuf::create(1);
516 buffer->append(buffer->capacity());
517 MutableByteRange output{};
519 // Test compressing empty data in one pass
520 if (!codec_->needsDataLength()) {
522 codec_->compressStream(input, output, StreamCodec::FlushOp::END));
524 codec_->resetStream(0);
525 EXPECT_TRUE(codec_->compressStream(input, output, StreamCodec::FlushOp::END));
526 output = {buffer->writableData(), buffer->length()};
527 EXPECT_TRUE(codec_->compressStream(input, output, StreamCodec::FlushOp::END));
528 EXPECT_EQ(buffer->length(), output.size());
530 // Test compressing empty data with multiple calls to compressStream()
531 codec_->resetStream(0);
533 EXPECT_FALSE(codec_->compressStream(input, output));
535 codec_->compressStream(input, output, StreamCodec::FlushOp::FLUSH));
536 EXPECT_TRUE(codec_->compressStream(input, output, StreamCodec::FlushOp::END));
537 codec_->resetStream(0);
538 output = {buffer->writableData(), buffer->length()};
539 EXPECT_FALSE(codec_->compressStream(input, output));
541 codec_->compressStream(input, output, StreamCodec::FlushOp::FLUSH));
542 EXPECT_TRUE(codec_->compressStream(input, output, StreamCodec::FlushOp::END));
543 EXPECT_EQ(buffer->length(), output.size());
545 // Test uncompressing empty data
547 codec_->resetStream();
548 EXPECT_TRUE(codec_->uncompressStream(input, output));
549 codec_->resetStream();
551 codec_->uncompressStream(input, output, StreamCodec::FlushOp::FLUSH));
552 codec_->resetStream();
554 codec_->uncompressStream(input, output, StreamCodec::FlushOp::END));
555 codec_->resetStream(0);
556 EXPECT_TRUE(codec_->uncompressStream(input, output));
557 codec_->resetStream(0);
559 codec_->uncompressStream(input, output, StreamCodec::FlushOp::FLUSH));
560 codec_->resetStream(0);
562 codec_->uncompressStream(input, output, StreamCodec::FlushOp::END));
565 TEST_P(StreamingUnitTest, noForwardProgressOkay) {
566 auto inBuffer = IOBuf::create(2);
567 inBuffer->writableData()[0] = 'a';
568 inBuffer->writableData()[0] = 'a';
570 auto input = inBuffer->coalesce();
571 auto compressed = codec_->compress(inBuffer.get());
573 auto outBuffer = IOBuf::create(codec_->maxCompressedLength(2));
574 MutableByteRange output{outBuffer->writableTail(), outBuffer->tailroom()};
576 ByteRange emptyInput;
577 MutableByteRange emptyOutput;
579 // Compress some data to avoid empty data special casing
580 if (codec_->needsDataLength()) {
581 codec_->resetStream(inBuffer->computeChainDataLength());
583 codec_->resetStream();
585 while (!input.empty()) {
586 codec_->compressStream(input, output);
588 // empty input and output is okay for flush NONE and FLUSH.
589 codec_->compressStream(emptyInput, emptyOutput);
590 codec_->compressStream(emptyInput, emptyOutput, StreamCodec::FlushOp::FLUSH);
592 if (codec_->needsDataLength()) {
593 codec_->resetStream(inBuffer->computeChainDataLength());
595 codec_->resetStream();
597 input = inBuffer->coalesce();
598 output = {outBuffer->writableTail(), outBuffer->tailroom()};
599 while (!input.empty()) {
600 codec_->compressStream(input, output);
602 // empty input and output is okay for flush END.
603 codec_->compressStream(emptyInput, emptyOutput, StreamCodec::FlushOp::END);
605 codec_->resetStream();
606 input = compressed->coalesce();
607 input.uncheckedSubtract(1); // Remove last byte so the operation is incomplete
608 output = {inBuffer->writableData(), inBuffer->length()};
609 // Uncompress some data to avoid empty data special casing
610 while (!input.empty()) {
611 EXPECT_FALSE(codec_->uncompressStream(input, output));
613 // empty input and output is okay for all flush values.
614 EXPECT_FALSE(codec_->uncompressStream(emptyInput, emptyOutput));
615 EXPECT_FALSE(codec_->uncompressStream(
616 emptyInput, emptyOutput, StreamCodec::FlushOp::FLUSH));
617 EXPECT_FALSE(codec_->uncompressStream(
618 emptyInput, emptyOutput, StreamCodec::FlushOp::END));
621 TEST_P(StreamingUnitTest, stateTransitions) {
622 auto inBuffer = IOBuf::create(1);
623 inBuffer->writableData()[0] = 'a';
625 auto compressed = codec_->compress(inBuffer.get());
626 ByteRange const in = compressed->coalesce();
627 auto outBuffer = IOBuf::create(codec_->maxCompressedLength(in.size()));
628 MutableByteRange const out{outBuffer->writableTail(), outBuffer->tailroom()};
631 StreamCodec::FlushOp flushOp = StreamCodec::FlushOp::NONE,
632 bool empty = false) {
634 auto output = empty ? MutableByteRange{} : out;
635 return codec_->compressStream(input, output, flushOp);
637 auto compress_all = [&](bool expect,
638 StreamCodec::FlushOp flushOp =
639 StreamCodec::FlushOp::NONE,
640 bool empty = false) {
642 auto output = empty ? MutableByteRange{} : out;
643 while (!input.empty()) {
645 EXPECT_TRUE(codec_->compressStream(input, output, flushOp));
647 EXPECT_FALSE(codec_->compressStream(input, output, flushOp));
651 auto uncompress = [&](
652 StreamCodec::FlushOp flushOp = StreamCodec::FlushOp::NONE,
653 bool empty = false) {
655 auto output = empty ? MutableByteRange{} : out;
656 return codec_->uncompressStream(input, output, flushOp);
660 if (!codec_->needsDataLength()) {
661 codec_->resetStream();
662 EXPECT_FALSE(compress());
663 EXPECT_FALSE(compress());
664 EXPECT_TRUE(compress(StreamCodec::FlushOp::FLUSH));
665 EXPECT_FALSE(compress());
666 EXPECT_TRUE(compress(StreamCodec::FlushOp::END));
668 codec_->resetStream(in.size() * 5);
671 compress_all(true, StreamCodec::FlushOp::FLUSH);
673 compress_all(true, StreamCodec::FlushOp::END);
675 // uncompression flow
676 codec_->resetStream();
677 EXPECT_FALSE(uncompress(StreamCodec::FlushOp::NONE, true));
678 codec_->resetStream();
679 EXPECT_FALSE(uncompress(StreamCodec::FlushOp::FLUSH, true));
680 codec_->resetStream();
681 EXPECT_FALSE(uncompress(StreamCodec::FlushOp::NONE, true));
682 codec_->resetStream();
683 EXPECT_FALSE(uncompress(StreamCodec::FlushOp::NONE, true));
684 codec_->resetStream();
685 EXPECT_TRUE(uncompress(StreamCodec::FlushOp::FLUSH));
686 // compress -> uncompress
687 codec_->resetStream(in.size());
688 EXPECT_FALSE(compress());
689 EXPECT_THROW(uncompress(), std::logic_error);
690 // uncompress -> compress
691 codec_->resetStream(inBuffer->computeChainDataLength());
692 EXPECT_TRUE(uncompress(StreamCodec::FlushOp::FLUSH));
693 EXPECT_THROW(compress(), std::logic_error);
695 if (!codec_->needsDataLength()) {
696 codec_->resetStream();
697 EXPECT_FALSE(compress());
698 EXPECT_TRUE(compress(StreamCodec::FlushOp::END));
699 EXPECT_THROW(compress(), std::logic_error);
701 codec_->resetStream(in.size() * 2);
703 compress_all(true, StreamCodec::FlushOp::END);
704 EXPECT_THROW(compress(), std::logic_error);
706 codec_->resetStream();
707 EXPECT_TRUE(uncompress(StreamCodec::FlushOp::FLUSH));
708 EXPECT_THROW(uncompress(), std::logic_error);
710 codec_->resetStream(in.size());
711 EXPECT_FALSE(compress(StreamCodec::FlushOp::FLUSH, true));
712 EXPECT_THROW(compress(), std::logic_error);
714 codec_->resetStream(in.size());
715 EXPECT_FALSE(compress(StreamCodec::FlushOp::FLUSH, true));
716 EXPECT_THROW(compress(StreamCodec::FlushOp::END), std::logic_error);
717 // undefined -> compress
718 codec_->compress(inBuffer.get());
719 EXPECT_THROW(compress(), std::logic_error);
720 codec_->uncompress(compressed.get(), inBuffer->computeChainDataLength());
721 EXPECT_THROW(compress(), std::logic_error);
722 // undefined -> undefined
723 codec_->uncompress(compressed.get());
724 codec_->compress(inBuffer.get());
727 INSTANTIATE_TEST_CASE_P(
730 testing::ValuesIn(availableStreamCodecs()));
732 class StreamingCompressionTest
733 : public testing::TestWithParam<std::tuple<int, int, CodecType>> {
735 void SetUp() override {
736 auto const tup = GetParam();
737 uncompressedLength_ = uint64_t(1) << std::get<0>(tup);
738 chunkSize_ = size_t(1) << std::get<1>(tup);
739 codec_ = getStreamCodec(std::get<2>(tup));
742 void runResetStreamTest(DataHolder const& dh);
743 void runCompressStreamTest(DataHolder const& dh);
744 void runUncompressStreamTest(DataHolder const& dh);
745 void runFlushTest(DataHolder const& dh);
748 std::vector<ByteRange> split(ByteRange data) const;
750 uint64_t uncompressedLength_;
752 std::unique_ptr<StreamCodec> codec_;
755 std::vector<ByteRange> StreamingCompressionTest::split(ByteRange data) const {
756 size_t const pieces = std::max<size_t>(1, data.size() / chunkSize_);
757 std::vector<ByteRange> result;
758 result.reserve(pieces + 1);
759 while (!data.empty()) {
760 size_t const pieceSize = std::min(data.size(), chunkSize_);
761 result.push_back(data.subpiece(0, pieceSize));
762 data.uncheckedAdvance(pieceSize);
767 static std::unique_ptr<IOBuf> compressSome(
771 StreamCodec::FlushOp flush) {
775 auto buffer = IOBuf::create(bufferSize);
776 buffer->append(buffer->capacity());
777 MutableByteRange output{buffer->writableData(), buffer->length()};
779 result = codec->compressStream(data, output, flush);
780 buffer->trimEnd(output.size());
781 queue.append(std::move(buffer));
783 } while (!(flush == StreamCodec::FlushOp::NONE && data.empty()) && !result);
784 EXPECT_TRUE(data.empty());
788 static std::pair<bool, std::unique_ptr<IOBuf>> uncompressSome(
792 StreamCodec::FlushOp flush) {
796 auto buffer = IOBuf::create(bufferSize);
797 buffer->append(buffer->capacity());
798 MutableByteRange output{buffer->writableData(), buffer->length()};
800 result = codec->uncompressStream(data, output, flush);
801 buffer->trimEnd(output.size());
802 queue.append(std::move(buffer));
804 } while (queue.tailroom() == 0 && !result);
805 return std::make_pair(result, queue.move());
808 void StreamingCompressionTest::runResetStreamTest(DataHolder const& dh) {
809 auto const input = dh.data(uncompressedLength_);
810 // Compress some but leave state unclean
811 codec_->resetStream(uncompressedLength_);
812 compressSome(codec_.get(), input, chunkSize_, StreamCodec::FlushOp::NONE);
813 // Reset stream and compress all
814 if (codec_->needsDataLength()) {
815 codec_->resetStream(uncompressedLength_);
817 codec_->resetStream();
820 compressSome(codec_.get(), input, chunkSize_, StreamCodec::FlushOp::END);
821 auto const uncompressed = codec_->uncompress(compressed.get(), input.size());
822 EXPECT_EQ(dh.hash(uncompressedLength_), hashIOBuf(uncompressed.get()));
825 TEST_P(StreamingCompressionTest, resetStream) {
826 runResetStreamTest(constantDataHolder);
827 runResetStreamTest(randomDataHolder);
830 void StreamingCompressionTest::runCompressStreamTest(
831 const folly::io::test::DataHolder& dh) {
832 auto const inputs = split(dh.data(uncompressedLength_));
835 codec_->resetStream(uncompressedLength_);
836 // Compress many inputs in a row
837 for (auto const input : inputs) {
838 queue.append(compressSome(
839 codec_.get(), input, chunkSize_, StreamCodec::FlushOp::NONE));
841 // Finish the operation with empty input.
844 compressSome(codec_.get(), empty, chunkSize_, StreamCodec::FlushOp::END));
846 auto const uncompressed = codec_->uncompress(queue.front());
847 EXPECT_EQ(dh.hash(uncompressedLength_), hashIOBuf(uncompressed.get()));
850 TEST_P(StreamingCompressionTest, compressStream) {
851 runCompressStreamTest(constantDataHolder);
852 runCompressStreamTest(randomDataHolder);
855 void StreamingCompressionTest::runUncompressStreamTest(
856 const folly::io::test::DataHolder& dh) {
857 auto const data = IOBuf::wrapBuffer(dh.data(uncompressedLength_));
858 // Concatenate 3 compressed frames in a row
859 auto compressed = codec_->compress(data.get());
860 compressed->prependChain(codec_->compress(data.get()));
861 compressed->prependChain(codec_->compress(data.get()));
862 // Pass all 3 compressed frames in one input buffer
863 auto input = compressed->coalesce();
864 // Uncompress the first frame
865 codec_->resetStream(data->computeChainDataLength());
867 auto const result = uncompressSome(
868 codec_.get(), input, chunkSize_, StreamCodec::FlushOp::FLUSH);
869 ASSERT_TRUE(result.first);
870 ASSERT_EQ(hashIOBuf(data.get()), hashIOBuf(result.second.get()));
872 // Uncompress the second frame
873 codec_->resetStream();
875 auto const result = uncompressSome(
876 codec_.get(), input, chunkSize_, StreamCodec::FlushOp::END);
877 ASSERT_TRUE(result.first);
878 ASSERT_EQ(hashIOBuf(data.get()), hashIOBuf(result.second.get()));
880 // Uncompress the third frame
881 codec_->resetStream();
883 auto const result = uncompressSome(
884 codec_.get(), input, chunkSize_, StreamCodec::FlushOp::FLUSH);
885 ASSERT_TRUE(result.first);
886 ASSERT_EQ(hashIOBuf(data.get()), hashIOBuf(result.second.get()));
888 EXPECT_TRUE(input.empty());
891 TEST_P(StreamingCompressionTest, uncompressStream) {
892 runUncompressStreamTest(constantDataHolder);
893 runUncompressStreamTest(randomDataHolder);
896 void StreamingCompressionTest::runFlushTest(DataHolder const& dh) {
897 auto const inputs = split(dh.data(uncompressedLength_));
898 auto uncodec = getStreamCodec(codec_->type());
900 if (codec_->needsDataLength()) {
901 codec_->resetStream(uncompressedLength_);
903 codec_->resetStream();
905 for (auto input : inputs) {
906 // Compress some data and flush the stream
907 auto compressed = compressSome(
908 codec_.get(), input, chunkSize_, StreamCodec::FlushOp::FLUSH);
909 auto compressedRange = compressed->coalesce();
910 // Uncompress the compressed data
911 auto result = uncompressSome(
915 StreamCodec::FlushOp::FLUSH);
916 // All compressed data should have been consumed
917 EXPECT_TRUE(compressedRange.empty());
918 // The frame isn't complete
919 EXPECT_FALSE(result.first);
920 // The uncompressed data should be exactly the input data
921 EXPECT_EQ(input.size(), result.second->computeChainDataLength());
922 auto const data = IOBuf::wrapBuffer(input);
923 EXPECT_EQ(hashIOBuf(data.get()), hashIOBuf(result.second.get()));
927 TEST_P(StreamingCompressionTest, testFlush) {
928 runFlushTest(constantDataHolder);
929 runFlushTest(randomDataHolder);
932 INSTANTIATE_TEST_CASE_P(
933 StreamingCompressionTest,
934 StreamingCompressionTest,
936 testing::Values(0, 1, 12, 22, 27),
937 testing::Values(12, 17, 20),
938 testing::ValuesIn(availableStreamCodecs())));
940 class AutomaticCodecTest : public testing::TestWithParam<CodecType> {
942 void SetUp() override {
943 codec_ = getCodec(GetParam());
944 auto_ = getAutoUncompressionCodec();
947 void runSimpleTest(const DataHolder& dh);
949 std::unique_ptr<Codec> codec_;
950 std::unique_ptr<Codec> auto_;
953 void AutomaticCodecTest::runSimpleTest(const DataHolder& dh) {
954 constexpr uint64_t uncompressedLength = 1000;
955 auto original = IOBuf::wrapBuffer(dh.data(uncompressedLength));
956 auto compressed = codec_->compress(original.get());
958 if (!codec_->needsUncompressedLength()) {
959 auto uncompressed = auto_->uncompress(compressed.get());
960 EXPECT_EQ(uncompressedLength, uncompressed->computeChainDataLength());
961 EXPECT_EQ(dh.hash(uncompressedLength), hashIOBuf(uncompressed.get()));
964 auto uncompressed = auto_->uncompress(compressed.get(), uncompressedLength);
965 EXPECT_EQ(uncompressedLength, uncompressed->computeChainDataLength());
966 EXPECT_EQ(dh.hash(uncompressedLength), hashIOBuf(uncompressed.get()));
968 ASSERT_GE(compressed->computeChainDataLength(), 8);
969 for (size_t i = 0; i < 8; ++i) {
970 auto split = compressed->clone();
971 auto rest = compressed->clone();
972 split->trimEnd(split->length() - i);
974 split->appendChain(std::move(rest));
975 auto uncompressed = auto_->uncompress(split.get(), uncompressedLength);
976 EXPECT_EQ(uncompressedLength, uncompressed->computeChainDataLength());
977 EXPECT_EQ(dh.hash(uncompressedLength), hashIOBuf(uncompressed.get()));
981 TEST_P(AutomaticCodecTest, RandomData) {
982 runSimpleTest(randomDataHolder);
985 TEST_P(AutomaticCodecTest, ConstantData) {
986 runSimpleTest(constantDataHolder);
989 TEST_P(AutomaticCodecTest, ValidPrefixes) {
990 const auto prefixes = codec_->validPrefixes();
991 for (const auto& prefix : prefixes) {
992 EXPECT_FALSE(prefix.empty());
993 // Ensure that all strings are at least 8 bytes for LZMA2.
994 // The bytes after the prefix should be ignored by `canUncompress()`.
995 IOBuf data{IOBuf::COPY_BUFFER, prefix, 0, 8};
997 EXPECT_TRUE(codec_->canUncompress(&data));
998 EXPECT_TRUE(auto_->canUncompress(&data));
1002 TEST_P(AutomaticCodecTest, NeedsUncompressedLength) {
1003 if (codec_->needsUncompressedLength()) {
1004 EXPECT_TRUE(auto_->needsUncompressedLength());
1008 TEST_P(AutomaticCodecTest, maxUncompressedLength) {
1009 EXPECT_LE(codec_->maxUncompressedLength(), auto_->maxUncompressedLength());
1012 TEST_P(AutomaticCodecTest, DefaultCodec) {
1013 const uint64_t length = 42;
1014 std::vector<std::unique_ptr<Codec>> codecs;
1015 codecs.push_back(getCodec(CodecType::ZSTD));
1016 auto automatic = getAutoUncompressionCodec(std::move(codecs));
1017 auto original = IOBuf::wrapBuffer(constantDataHolder.data(length));
1018 auto compressed = codec_->compress(original.get());
1019 auto decompressed = automatic->uncompress(compressed.get());
1021 EXPECT_EQ(constantDataHolder.hash(length), hashIOBuf(decompressed.get()));
1025 class CustomCodec : public Codec {
1027 static std::unique_ptr<Codec> create(std::string prefix, CodecType type) {
1028 return std::make_unique<CustomCodec>(std::move(prefix), type);
1030 explicit CustomCodec(std::string prefix, CodecType type)
1031 : Codec(CodecType::USER_DEFINED),
1032 prefix_(std::move(prefix)),
1033 codec_(getCodec(type)) {}
1036 std::vector<std::string> validPrefixes() const override {
1040 uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override {
1041 return codec_->maxCompressedLength(uncompressedLength) + prefix_.size();
1044 bool canUncompress(const IOBuf* data, Optional<uint64_t>) const override {
1045 auto clone = data->cloneCoalescedAsValue();
1046 if (clone.length() < prefix_.size()) {
1049 return memcmp(clone.data(), prefix_.data(), prefix_.size()) == 0;
1052 std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override {
1053 auto result = IOBuf::copyBuffer(prefix_);
1054 result->appendChain(codec_->compress(data));
1055 EXPECT_TRUE(canUncompress(result.get(), data->computeChainDataLength()));
1059 std::unique_ptr<IOBuf> doUncompress(
1061 Optional<uint64_t> uncompressedLength) override {
1062 EXPECT_TRUE(canUncompress(data, uncompressedLength));
1063 auto clone = data->cloneCoalescedAsValue();
1064 clone.trimStart(prefix_.size());
1065 return codec_->uncompress(&clone, uncompressedLength);
1068 std::string prefix_;
1069 std::unique_ptr<Codec> codec_;
1073 TEST_P(AutomaticCodecTest, CustomCodec) {
1074 const uint64_t length = 42;
1075 auto ab = CustomCodec::create("ab", CodecType::ZSTD);
1076 std::vector<std::unique_ptr<Codec>> codecs;
1077 codecs.push_back(CustomCodec::create("ab", CodecType::ZSTD));
1078 auto automatic = getAutoUncompressionCodec(std::move(codecs));
1079 auto original = IOBuf::wrapBuffer(constantDataHolder.data(length));
1081 auto abCompressed = ab->compress(original.get());
1082 auto abDecompressed = automatic->uncompress(abCompressed.get());
1083 EXPECT_TRUE(automatic->canUncompress(abCompressed.get()));
1084 EXPECT_FALSE(auto_->canUncompress(abCompressed.get()));
1085 EXPECT_EQ(constantDataHolder.hash(length), hashIOBuf(abDecompressed.get()));
1087 auto compressed = codec_->compress(original.get());
1088 auto decompressed = automatic->uncompress(compressed.get());
1089 EXPECT_EQ(constantDataHolder.hash(length), hashIOBuf(decompressed.get()));
1092 TEST_P(AutomaticCodecTest, CustomDefaultCodec) {
1093 const uint64_t length = 42;
1094 auto none = CustomCodec::create("none", CodecType::NO_COMPRESSION);
1095 std::vector<std::unique_ptr<Codec>> codecs;
1096 codecs.push_back(CustomCodec::create("none", CodecType::NO_COMPRESSION));
1097 codecs.push_back(getCodec(CodecType::LZ4_FRAME));
1098 auto automatic = getAutoUncompressionCodec(std::move(codecs));
1099 auto original = IOBuf::wrapBuffer(constantDataHolder.data(length));
1101 auto noneCompressed = none->compress(original.get());
1102 auto noneDecompressed = automatic->uncompress(noneCompressed.get());
1103 EXPECT_TRUE(automatic->canUncompress(noneCompressed.get()));
1104 EXPECT_FALSE(auto_->canUncompress(noneCompressed.get()));
1105 EXPECT_EQ(constantDataHolder.hash(length), hashIOBuf(noneDecompressed.get()));
1107 auto compressed = codec_->compress(original.get());
1108 auto decompressed = automatic->uncompress(compressed.get());
1109 EXPECT_EQ(constantDataHolder.hash(length), hashIOBuf(decompressed.get()));
1112 TEST_P(AutomaticCodecTest, canUncompressOneBytes) {
1113 // No default codec can uncompress 1 bytes.
1114 IOBuf buf{IOBuf::CREATE, 1};
1116 EXPECT_FALSE(codec_->canUncompress(&buf, 1));
1117 EXPECT_FALSE(codec_->canUncompress(&buf, folly::none));
1118 EXPECT_FALSE(auto_->canUncompress(&buf, 1));
1119 EXPECT_FALSE(auto_->canUncompress(&buf, folly::none));
1122 INSTANTIATE_TEST_CASE_P(
1126 CodecType::LZ4_FRAME,
1133 TEST(ValidPrefixesTest, CustomCodec) {
1134 std::vector<std::unique_ptr<Codec>> codecs;
1135 codecs.push_back(CustomCodec::create("none", CodecType::NO_COMPRESSION));
1136 const auto none = getAutoUncompressionCodec(std::move(codecs));
1137 const auto prefixes = none->validPrefixes();
1138 const auto it = std::find(prefixes.begin(), prefixes.end(), "none");
1139 EXPECT_TRUE(it != prefixes.end());
1142 #define EXPECT_THROW_IF_DEBUG(statement, expected_exception) \
1145 EXPECT_THROW((statement), expected_exception); \
1147 EXPECT_NO_THROW((statement)); \
1151 TEST(CheckCompatibleTest, SimplePrefixSecond) {
1152 std::vector<std::unique_ptr<Codec>> codecs;
1153 codecs.push_back(CustomCodec::create("abc", CodecType::NO_COMPRESSION));
1154 codecs.push_back(CustomCodec::create("ab", CodecType::NO_COMPRESSION));
1155 EXPECT_THROW_IF_DEBUG(
1156 getAutoUncompressionCodec(std::move(codecs)), std::invalid_argument);
1159 TEST(CheckCompatibleTest, SimplePrefixFirst) {
1160 std::vector<std::unique_ptr<Codec>> codecs;
1161 codecs.push_back(CustomCodec::create("ab", CodecType::NO_COMPRESSION));
1162 codecs.push_back(CustomCodec::create("abc", CodecType::NO_COMPRESSION));
1163 EXPECT_THROW_IF_DEBUG(
1164 getAutoUncompressionCodec(std::move(codecs)), std::invalid_argument);
1167 TEST(CheckCompatibleTest, Empty) {
1168 std::vector<std::unique_ptr<Codec>> codecs;
1169 codecs.push_back(CustomCodec::create("", CodecType::NO_COMPRESSION));
1170 EXPECT_THROW_IF_DEBUG(
1171 getAutoUncompressionCodec(std::move(codecs)), std::invalid_argument);
1174 TEST(CheckCompatibleTest, ZstdPrefix) {
1175 std::vector<std::unique_ptr<Codec>> codecs;
1176 codecs.push_back(CustomCodec::create("\x28\xB5\x2F", CodecType::ZSTD));
1177 EXPECT_THROW_IF_DEBUG(
1178 getAutoUncompressionCodec(std::move(codecs)), std::invalid_argument);
1181 TEST(CheckCompatibleTest, ZstdDuplicate) {
1182 std::vector<std::unique_ptr<Codec>> codecs;
1183 codecs.push_back(CustomCodec::create("\x28\xB5\x2F\xFD", CodecType::ZSTD));
1184 EXPECT_THROW_IF_DEBUG(
1185 getAutoUncompressionCodec(std::move(codecs)), std::invalid_argument);
1188 TEST(CheckCompatibleTest, ZlibIsPrefix) {
1189 std::vector<std::unique_ptr<Codec>> codecs;
1190 codecs.push_back(CustomCodec::create("\x18\x76zzasdf", CodecType::ZSTD));
1191 EXPECT_THROW_IF_DEBUG(
1192 getAutoUncompressionCodec(std::move(codecs)), std::invalid_argument);
1195 #if FOLLY_HAVE_LIBZSTD
1197 TEST(ZstdTest, BackwardCompatible) {
1198 auto codec = getCodec(CodecType::ZSTD);
1200 auto const data = IOBuf::wrapBuffer(randomDataHolder.data(size_t(1) << 20));
1201 auto compressed = codec->compress(data.get());
1202 compressed->coalesce();
1205 ZSTD_getDecompressedSize(compressed->data(), compressed->length()));
1209 IOBuf::wrapBuffer(randomDataHolder.data(size_t(100) << 20));
1210 auto compressed = codec->compress(data.get());
1211 compressed->coalesce();
1214 ZSTD_getDecompressedSize(compressed->data(), compressed->length()));
1222 using ZlibFormat = zlib::Options::Format;
1224 TEST(ZlibTest, Auto) {
1225 size_t const uncompressedLength_ = (size_t)1 << 15;
1226 auto const original = std::string(
1227 reinterpret_cast<const char*>(
1228 randomDataHolder.data(uncompressedLength_).data()),
1229 uncompressedLength_);
1230 auto optionCodec = zlib::getCodec(zlib::Options(ZlibFormat::AUTO));
1232 // Test the codec can uncompress zlib data.
1234 auto codec = getCodec(CodecType::ZLIB);
1235 auto const compressed = codec->compress(original);
1236 auto const uncompressed = optionCodec->uncompress(compressed);
1237 EXPECT_EQ(original, uncompressed);
1240 // Test the codec can uncompress gzip data.
1242 auto codec = getCodec(CodecType::GZIP);
1243 auto const compressed = codec->compress(original);
1244 auto const uncompressed = optionCodec->uncompress(compressed);
1245 EXPECT_EQ(original, uncompressed);
1249 TEST(ZlibTest, DefaultOptions) {
1250 size_t const uncompressedLength_ = (size_t)1 << 20;
1251 auto const original = std::string(
1252 reinterpret_cast<const char*>(
1253 randomDataHolder.data(uncompressedLength_).data()),
1254 uncompressedLength_);
1256 auto codec = getCodec(CodecType::ZLIB);
1257 auto optionCodec = zlib::getCodec(zlib::defaultZlibOptions());
1258 auto const compressed = optionCodec->compress(original);
1259 auto uncompressed = codec->uncompress(compressed);
1260 EXPECT_EQ(original, uncompressed);
1261 uncompressed = optionCodec->uncompress(compressed);
1262 EXPECT_EQ(original, uncompressed);
1266 auto codec = getCodec(CodecType::GZIP);
1267 auto optionCodec = zlib::getCodec(zlib::defaultGzipOptions());
1268 auto const compressed = optionCodec->compress(original);
1269 auto uncompressed = codec->uncompress(compressed);
1270 EXPECT_EQ(original, uncompressed);
1271 uncompressed = optionCodec->uncompress(compressed);
1272 EXPECT_EQ(original, uncompressed);
1276 class ZlibOptionsTest : public testing::TestWithParam<
1277 std::tr1::tuple<ZlibFormat, int, int, int>> {
1279 void SetUp() override {
1280 auto tup = GetParam();
1281 options_.format = std::tr1::get<0>(tup);
1282 options_.windowSize = std::tr1::get<1>(tup);
1283 options_.memLevel = std::tr1::get<2>(tup);
1284 options_.strategy = std::tr1::get<3>(tup);
1285 codec_ = zlib::getStreamCodec(options_);
1288 void runSimpleRoundTripTest(const DataHolder& dh);
1291 zlib::Options options_;
1292 std::unique_ptr<StreamCodec> codec_;
1295 void ZlibOptionsTest::runSimpleRoundTripTest(const DataHolder& dh) {
1296 size_t const uncompressedLength = (size_t)1 << 16;
1297 auto const original = std::string(
1298 reinterpret_cast<const char*>(dh.data(uncompressedLength).data()),
1299 uncompressedLength);
1301 auto const compressed = codec_->compress(original);
1302 auto const uncompressed = codec_->uncompress(compressed);
1303 EXPECT_EQ(uncompressed, original);
1306 TEST_P(ZlibOptionsTest, simpleRoundTripTest) {
1307 runSimpleRoundTripTest(constantDataHolder);
1308 runSimpleRoundTripTest(randomDataHolder);
1311 INSTANTIATE_TEST_CASE_P(
1320 testing::Values(9, 12, 15),
1321 testing::Values(1, 8, 9),
1329 #endif // FOLLY_HAVE_LIBZ
1333 } // namespace folly
1335 int main(int argc, char *argv[]) {
1336 testing::InitGoogleTest(&argc, argv);
1337 gflags::ParseCommandLineFlags(&argc, &argv, true);
1339 auto ret = RUN_ALL_TESTS();
1341 folly::runBenchmarksOnFlag();