2 * Copyright 2017 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #include <folly/io/Compression.h>
23 #include <unordered_map>
26 #include <boost/noncopyable.hpp>
27 #include <glog/logging.h>
29 #include <folly/Benchmark.h>
30 #include <folly/Hash.h>
31 #include <folly/Memory.h>
32 #include <folly/Random.h>
33 #include <folly/Varint.h>
34 #include <folly/io/IOBufQueue.h>
35 #include <folly/portability/GTest.h>
37 #if FOLLY_HAVE_LIBZSTD
45 class DataHolder : private boost::noncopyable {
47 uint64_t hash(size_t size) const;
48 ByteRange data(size_t size) const;
51 explicit DataHolder(size_t sizeLog2);
53 std::unique_ptr<uint8_t[]> data_;
54 mutable std::unordered_map<uint64_t, uint64_t> hashCache_;
57 DataHolder::DataHolder(size_t sizeLog2)
58 : size_(size_t(1) << sizeLog2),
59 data_(new uint8_t[size_]) {
62 uint64_t DataHolder::hash(size_t size) const {
63 CHECK_LE(size, size_);
64 auto p = hashCache_.find(size);
65 if (p != hashCache_.end()) {
69 uint64_t h = folly::hash::fnv64_buf(data_.get(), size);
74 ByteRange DataHolder::data(size_t size) const {
75 CHECK_LE(size, size_);
76 return ByteRange(data_.get(), size);
79 uint64_t hashIOBuf(const IOBuf* buf) {
80 uint64_t h = folly::hash::FNV_64_HASH_START;
81 for (auto& range : *buf) {
82 h = folly::hash::fnv64_buf(range.data(), range.size(), h);
87 class RandomDataHolder : public DataHolder {
89 explicit RandomDataHolder(size_t sizeLog2);
92 RandomDataHolder::RandomDataHolder(size_t sizeLog2)
93 : DataHolder(sizeLog2) {
94 static constexpr size_t numThreadsLog2 = 3;
95 static constexpr size_t numThreads = size_t(1) << numThreadsLog2;
97 uint32_t seed = randomNumberSeed();
99 std::vector<std::thread> threads;
100 threads.reserve(numThreads);
101 for (size_t t = 0; t < numThreads; ++t) {
102 threads.emplace_back([this, seed, t, sizeLog2] {
103 std::mt19937 rng(seed + t);
104 size_t countLog2 = sizeLog2 - numThreadsLog2;
105 size_t start = size_t(t) << countLog2;
106 for (size_t i = 0; i < countLog2; ++i) {
107 this->data_[start + i] = rng();
112 for (auto& t : threads) {
117 class ConstantDataHolder : public DataHolder {
119 explicit ConstantDataHolder(size_t sizeLog2);
122 ConstantDataHolder::ConstantDataHolder(size_t sizeLog2)
123 : DataHolder(sizeLog2) {
124 memset(data_.get(), 'a', size_);
127 constexpr size_t dataSizeLog2 = 27; // 128MiB
128 RandomDataHolder randomDataHolder(dataSizeLog2);
129 ConstantDataHolder constantDataHolder(dataSizeLog2);
131 // The intersection of the provided codecs & those that are compiled in.
132 static std::vector<CodecType> supportedCodecs(std::vector<CodecType> const& v) {
133 std::vector<CodecType> supported;
138 std::back_inserter(supported),
144 // All compiled-in compression codecs.
145 static std::vector<CodecType> availableCodecs() {
146 std::vector<CodecType> codecs;
148 for (size_t i = 0; i < static_cast<size_t>(CodecType::NUM_CODEC_TYPES); ++i) {
149 auto type = static_cast<CodecType>(i);
150 if (hasCodec(type)) {
151 codecs.push_back(type);
158 static std::vector<CodecType> availableStreamCodecs() {
159 std::vector<CodecType> codecs;
161 for (size_t i = 0; i < static_cast<size_t>(CodecType::NUM_CODEC_TYPES); ++i) {
162 auto type = static_cast<CodecType>(i);
163 if (hasStreamCodec(type)) {
164 codecs.push_back(type);
171 TEST(CompressionTestNeedsUncompressedLength, Simple) {
172 static const struct { CodecType type; bool needsUncompressedLength; }
174 { CodecType::NO_COMPRESSION, false },
175 { CodecType::LZ4, true },
176 { CodecType::SNAPPY, false },
177 { CodecType::ZLIB, false },
178 { CodecType::LZ4_VARINT_SIZE, false },
179 { CodecType::LZMA2, false },
180 { CodecType::LZMA2_VARINT_SIZE, false },
181 { CodecType::ZSTD, false },
182 { CodecType::GZIP, false },
183 { CodecType::LZ4_FRAME, false },
184 { CodecType::BZIP2, false },
187 for (auto const& test : expectations) {
188 if (hasCodec(test.type)) {
189 EXPECT_EQ(getCodec(test.type)->needsUncompressedLength(),
190 test.needsUncompressedLength);
195 class CompressionTest
196 : public testing::TestWithParam<std::tr1::tuple<int, int, CodecType>> {
198 void SetUp() override {
199 auto tup = GetParam();
200 uncompressedLength_ = uint64_t(1) << std::tr1::get<0>(tup);
201 chunks_ = std::tr1::get<1>(tup);
202 codec_ = getCodec(std::tr1::get<2>(tup));
205 void runSimpleIOBufTest(const DataHolder& dh);
207 void runSimpleStringTest(const DataHolder& dh);
210 std::unique_ptr<IOBuf> split(std::unique_ptr<IOBuf> data) const;
212 uint64_t uncompressedLength_;
214 std::unique_ptr<Codec> codec_;
217 void CompressionTest::runSimpleIOBufTest(const DataHolder& dh) {
218 const auto original = split(IOBuf::wrapBuffer(dh.data(uncompressedLength_)));
219 const auto compressed = split(codec_->compress(original.get()));
220 if (!codec_->needsUncompressedLength()) {
221 auto uncompressed = codec_->uncompress(compressed.get());
222 EXPECT_EQ(uncompressedLength_, uncompressed->computeChainDataLength());
223 EXPECT_EQ(dh.hash(uncompressedLength_), hashIOBuf(uncompressed.get()));
226 auto uncompressed = codec_->uncompress(compressed.get(),
227 uncompressedLength_);
228 EXPECT_EQ(uncompressedLength_, uncompressed->computeChainDataLength());
229 EXPECT_EQ(dh.hash(uncompressedLength_), hashIOBuf(uncompressed.get()));
233 void CompressionTest::runSimpleStringTest(const DataHolder& dh) {
234 const auto original = std::string(
235 reinterpret_cast<const char*>(dh.data(uncompressedLength_).data()),
236 uncompressedLength_);
237 const auto compressed = codec_->compress(original);
238 if (!codec_->needsUncompressedLength()) {
239 auto uncompressed = codec_->uncompress(compressed);
240 EXPECT_EQ(uncompressedLength_, uncompressed.length());
241 EXPECT_EQ(uncompressed, original);
244 auto uncompressed = codec_->uncompress(compressed, uncompressedLength_);
245 EXPECT_EQ(uncompressedLength_, uncompressed.length());
246 EXPECT_EQ(uncompressed, original);
250 // Uniformly split data into (potentially empty) chunks.
251 std::unique_ptr<IOBuf> CompressionTest::split(
252 std::unique_ptr<IOBuf> data) const {
253 if (data->isChained()) {
257 const size_t size = data->computeChainDataLength();
259 std::multiset<size_t> splits;
260 for (size_t i = 1; i < chunks_; ++i) {
261 splits.insert(Random::rand64(size));
264 folly::IOBufQueue result;
267 for (size_t split : splits) {
268 result.append(IOBuf::copyBuffer(data->data() + offset, split - offset));
271 result.append(IOBuf::copyBuffer(data->data() + offset, size - offset));
273 return result.move();
276 TEST_P(CompressionTest, RandomData) {
277 runSimpleIOBufTest(randomDataHolder);
280 TEST_P(CompressionTest, ConstantData) {
281 runSimpleIOBufTest(constantDataHolder);
284 TEST_P(CompressionTest, RandomDataString) {
285 runSimpleStringTest(randomDataHolder);
288 TEST_P(CompressionTest, ConstantDataString) {
289 runSimpleStringTest(constantDataHolder);
292 INSTANTIATE_TEST_CASE_P(
296 testing::Values(0, 1, 12, 22, 25, 27),
297 testing::Values(1, 2, 3, 8, 65),
298 testing::ValuesIn(availableCodecs())));
300 class CompressionVarintTest
301 : public testing::TestWithParam<std::tr1::tuple<int, CodecType>> {
303 void SetUp() override {
304 auto tup = GetParam();
305 uncompressedLength_ = uint64_t(1) << std::tr1::get<0>(tup);
306 codec_ = getCodec(std::tr1::get<1>(tup));
309 void runSimpleTest(const DataHolder& dh);
311 uint64_t uncompressedLength_;
312 std::unique_ptr<Codec> codec_;
315 inline uint64_t oneBasedMsbPos(uint64_t number) {
317 for (; number > 0; ++pos, number >>= 1) {
322 void CompressionVarintTest::runSimpleTest(const DataHolder& dh) {
323 auto original = IOBuf::wrapBuffer(dh.data(uncompressedLength_));
324 auto compressed = codec_->compress(original.get());
328 std::max(uint64_t(9), oneBasedMsbPos(uncompressedLength_)) / 9UL);
329 auto tinyBuf = IOBuf::copyBuffer(compressed->data(),
330 std::min(compressed->length(), breakPoint));
331 compressed->trimStart(breakPoint);
332 tinyBuf->prependChain(std::move(compressed));
333 compressed = std::move(tinyBuf);
335 auto uncompressed = codec_->uncompress(compressed.get());
337 EXPECT_EQ(uncompressedLength_, uncompressed->computeChainDataLength());
338 EXPECT_EQ(dh.hash(uncompressedLength_), hashIOBuf(uncompressed.get()));
341 TEST_P(CompressionVarintTest, RandomData) {
342 runSimpleTest(randomDataHolder);
345 TEST_P(CompressionVarintTest, ConstantData) {
346 runSimpleTest(constantDataHolder);
349 INSTANTIATE_TEST_CASE_P(
350 CompressionVarintTest,
351 CompressionVarintTest,
353 testing::Values(0, 1, 12, 22, 25, 27),
354 testing::ValuesIn(supportedCodecs({
355 CodecType::LZ4_VARINT_SIZE,
356 CodecType::LZMA2_VARINT_SIZE,
359 class CompressionCorruptionTest : public testing::TestWithParam<CodecType> {
361 void SetUp() override { codec_ = getCodec(GetParam()); }
363 void runSimpleTest(const DataHolder& dh);
365 std::unique_ptr<Codec> codec_;
368 void CompressionCorruptionTest::runSimpleTest(const DataHolder& dh) {
369 constexpr uint64_t uncompressedLength = 42;
370 auto original = IOBuf::wrapBuffer(dh.data(uncompressedLength));
371 auto compressed = codec_->compress(original.get());
373 if (!codec_->needsUncompressedLength()) {
374 auto uncompressed = codec_->uncompress(compressed.get());
375 EXPECT_EQ(uncompressedLength, uncompressed->computeChainDataLength());
376 EXPECT_EQ(dh.hash(uncompressedLength), hashIOBuf(uncompressed.get()));
379 auto uncompressed = codec_->uncompress(compressed.get(),
381 EXPECT_EQ(uncompressedLength, uncompressed->computeChainDataLength());
382 EXPECT_EQ(dh.hash(uncompressedLength), hashIOBuf(uncompressed.get()));
385 EXPECT_THROW(codec_->uncompress(compressed.get(), uncompressedLength + 1),
388 auto corrupted = compressed->clone();
389 corrupted->unshare();
390 // Truncate the last character
391 corrupted->prev()->trimEnd(1);
392 if (!codec_->needsUncompressedLength()) {
393 EXPECT_THROW(codec_->uncompress(corrupted.get()),
397 EXPECT_THROW(codec_->uncompress(corrupted.get(), uncompressedLength),
400 corrupted = compressed->clone();
401 corrupted->unshare();
402 // Corrupt the first character
403 ++(corrupted->writableData()[0]);
405 if (!codec_->needsUncompressedLength()) {
406 EXPECT_THROW(codec_->uncompress(corrupted.get()),
410 EXPECT_THROW(codec_->uncompress(corrupted.get(), uncompressedLength),
414 TEST_P(CompressionCorruptionTest, RandomData) {
415 runSimpleTest(randomDataHolder);
418 TEST_P(CompressionCorruptionTest, ConstantData) {
419 runSimpleTest(constantDataHolder);
422 INSTANTIATE_TEST_CASE_P(
423 CompressionCorruptionTest,
424 CompressionCorruptionTest,
426 // NO_COMPRESSION can't detect corruption
427 // LZ4 can't detect corruption reliably (sigh)
433 CodecType::LZ4_FRAME,
437 class StreamingUnitTest : public testing::TestWithParam<CodecType> {
439 void SetUp() override {
440 codec_ = getStreamCodec(GetParam());
443 std::unique_ptr<StreamCodec> codec_;
446 TEST_P(StreamingUnitTest, maxCompressedLength) {
447 EXPECT_EQ(0, codec_->maxCompressedLength(0));
448 for (uint64_t const length : {1, 10, 100, 1000, 10000, 100000, 1000000}) {
449 EXPECT_GE(codec_->maxCompressedLength(length), length);
453 TEST_P(StreamingUnitTest, getUncompressedLength) {
454 auto const empty = IOBuf::create(0);
455 EXPECT_EQ(uint64_t(0), codec_->getUncompressedLength(empty.get()));
456 EXPECT_EQ(uint64_t(0), codec_->getUncompressedLength(empty.get(), 0));
458 auto const data = IOBuf::wrapBuffer(randomDataHolder.data(100));
459 auto const compressed = codec_->compress(data.get());
461 EXPECT_ANY_THROW(codec_->getUncompressedLength(data.get(), 0));
462 if (auto const length = codec_->getUncompressedLength(data.get())) {
463 EXPECT_EQ(100, *length);
465 EXPECT_EQ(uint64_t(100), codec_->getUncompressedLength(data.get(), 100));
466 // If the uncompressed length is stored in the frame, then make sure it throws
467 // when it is given the wrong length.
468 if (codec_->getUncompressedLength(data.get()) == uint64_t(100)) {
469 EXPECT_ANY_THROW(codec_->getUncompressedLength(data.get(), 200));
473 TEST_P(StreamingUnitTest, emptyData) {
475 auto buffer = IOBuf::create(1);
476 buffer->append(buffer->capacity());
477 MutableByteRange output{};
479 // Test compressing empty data in one pass
480 EXPECT_TRUE(codec_->compressStream(input, output, StreamCodec::FlushOp::END));
481 codec_->resetStream(0);
482 EXPECT_TRUE(codec_->compressStream(input, output, StreamCodec::FlushOp::END));
483 codec_->resetStream();
484 output = {buffer->writableData(), buffer->length()};
485 EXPECT_TRUE(codec_->compressStream(input, output, StreamCodec::FlushOp::END));
486 EXPECT_EQ(buffer->length(), output.size());
488 // Test compressing empty data with multiple calls to compressStream()
489 codec_->resetStream();
491 EXPECT_FALSE(codec_->compressStream(input, output));
493 codec_->compressStream(input, output, StreamCodec::FlushOp::FLUSH));
494 EXPECT_TRUE(codec_->compressStream(input, output, StreamCodec::FlushOp::END));
495 codec_->resetStream();
496 output = {buffer->writableData(), buffer->length()};
497 EXPECT_FALSE(codec_->compressStream(input, output));
499 codec_->compressStream(input, output, StreamCodec::FlushOp::FLUSH));
500 EXPECT_TRUE(codec_->compressStream(input, output, StreamCodec::FlushOp::END));
501 EXPECT_EQ(buffer->length(), output.size());
503 // Test uncompressing empty data
505 codec_->resetStream();
506 EXPECT_TRUE(codec_->uncompressStream(input, output));
507 codec_->resetStream();
509 codec_->uncompressStream(input, output, StreamCodec::FlushOp::FLUSH));
510 codec_->resetStream();
512 codec_->uncompressStream(input, output, StreamCodec::FlushOp::END));
513 codec_->resetStream(0);
514 EXPECT_TRUE(codec_->uncompressStream(input, output));
515 codec_->resetStream(0);
517 codec_->uncompressStream(input, output, StreamCodec::FlushOp::FLUSH));
518 codec_->resetStream(0);
520 codec_->uncompressStream(input, output, StreamCodec::FlushOp::END));
523 TEST_P(StreamingUnitTest, noForwardProgressOkay) {
524 auto inBuffer = IOBuf::create(2);
525 inBuffer->writableData()[0] = 'a';
526 inBuffer->writableData()[0] = 'a';
528 auto input = inBuffer->coalesce();
529 auto compressed = codec_->compress(inBuffer.get());
531 auto outBuffer = IOBuf::create(codec_->maxCompressedLength(2));
532 MutableByteRange output{outBuffer->writableTail(), outBuffer->tailroom()};
534 ByteRange emptyInput;
535 MutableByteRange emptyOutput;
537 // Compress some data to avoid empty data special casing
538 codec_->resetStream();
539 while (!input.empty()) {
540 codec_->compressStream(input, output);
542 // empty input and output is okay for flush NONE and FLUSH.
543 codec_->compressStream(emptyInput, emptyOutput);
544 codec_->compressStream(emptyInput, emptyOutput, StreamCodec::FlushOp::FLUSH);
546 codec_->resetStream();
547 input = inBuffer->coalesce();
548 output = {outBuffer->writableTail(), outBuffer->tailroom()};
549 while (!input.empty()) {
550 codec_->compressStream(input, output);
552 // empty input and output is okay for flush END.
553 codec_->compressStream(emptyInput, emptyOutput, StreamCodec::FlushOp::END);
555 codec_->resetStream();
556 input = compressed->coalesce();
557 input.uncheckedSubtract(1); // Remove last byte so the operation is incomplete
558 output = {inBuffer->writableData(), inBuffer->length()};
559 // Uncompress some data to avoid empty data special casing
560 while (!input.empty()) {
561 EXPECT_FALSE(codec_->uncompressStream(input, output));
563 // empty input and output is okay for all flush values.
564 EXPECT_FALSE(codec_->uncompressStream(emptyInput, emptyOutput));
565 EXPECT_FALSE(codec_->uncompressStream(
566 emptyInput, emptyOutput, StreamCodec::FlushOp::FLUSH));
567 EXPECT_FALSE(codec_->uncompressStream(
568 emptyInput, emptyOutput, StreamCodec::FlushOp::END));
571 TEST_P(StreamingUnitTest, stateTransitions) {
572 auto inBuffer = IOBuf::create(1);
573 inBuffer->writableData()[0] = 'a';
575 auto compressed = codec_->compress(inBuffer.get());
576 ByteRange const in = compressed->coalesce();
577 auto outBuffer = IOBuf::create(codec_->maxCompressedLength(in.size()));
578 MutableByteRange const out{outBuffer->writableTail(), outBuffer->tailroom()};
581 StreamCodec::FlushOp flushOp = StreamCodec::FlushOp::NONE,
582 bool empty = false) {
584 auto output = empty ? MutableByteRange{} : out;
585 return codec_->compressStream(input, output, flushOp);
587 auto uncompress = [&](
588 StreamCodec::FlushOp flushOp = StreamCodec::FlushOp::NONE,
589 bool empty = false) {
591 auto output = empty ? MutableByteRange{} : out;
592 return codec_->uncompressStream(input, output, flushOp);
596 codec_->resetStream();
597 EXPECT_FALSE(compress());
598 EXPECT_FALSE(compress());
599 EXPECT_TRUE(compress(StreamCodec::FlushOp::FLUSH));
600 EXPECT_FALSE(compress());
601 EXPECT_TRUE(compress(StreamCodec::FlushOp::END));
602 // uncompression flow
603 codec_->resetStream();
604 EXPECT_FALSE(uncompress(StreamCodec::FlushOp::NONE, true));
605 codec_->resetStream();
606 EXPECT_FALSE(uncompress(StreamCodec::FlushOp::FLUSH, true));
607 codec_->resetStream();
608 EXPECT_FALSE(uncompress(StreamCodec::FlushOp::NONE, true));
609 codec_->resetStream();
610 EXPECT_FALSE(uncompress(StreamCodec::FlushOp::NONE, true));
611 codec_->resetStream();
612 EXPECT_TRUE(uncompress(StreamCodec::FlushOp::FLUSH));
613 // compress -> uncompress
614 codec_->resetStream();
615 EXPECT_FALSE(compress());
616 EXPECT_THROW(uncompress(), std::logic_error);
617 // uncompress -> compress
618 codec_->resetStream();
619 EXPECT_TRUE(uncompress(StreamCodec::FlushOp::FLUSH));
620 EXPECT_THROW(compress(), std::logic_error);
622 codec_->resetStream();
623 EXPECT_FALSE(compress());
624 EXPECT_TRUE(compress(StreamCodec::FlushOp::END));
625 EXPECT_THROW(compress(), std::logic_error);
627 codec_->resetStream();
628 EXPECT_TRUE(uncompress(StreamCodec::FlushOp::FLUSH));
629 EXPECT_THROW(uncompress(), std::logic_error);
631 codec_->resetStream();
632 EXPECT_FALSE(compress(StreamCodec::FlushOp::FLUSH, true));
633 EXPECT_THROW(compress(), std::logic_error);
635 codec_->resetStream();
636 EXPECT_FALSE(compress(StreamCodec::FlushOp::FLUSH, true));
637 EXPECT_THROW(compress(StreamCodec::FlushOp::END), std::logic_error);
638 // undefined -> compress
639 codec_->compress(inBuffer.get());
640 EXPECT_THROW(compress(), std::logic_error);
641 codec_->uncompress(compressed.get());
642 EXPECT_THROW(compress(), std::logic_error);
643 // undefined -> undefined
644 codec_->uncompress(compressed.get());
645 codec_->compress(inBuffer.get());
648 INSTANTIATE_TEST_CASE_P(
651 testing::ValuesIn(availableStreamCodecs()));
653 class StreamingCompressionTest
654 : public testing::TestWithParam<std::tuple<int, int, CodecType>> {
656 void SetUp() override {
657 auto const tup = GetParam();
658 uncompressedLength_ = uint64_t(1) << std::get<0>(tup);
659 chunkSize_ = size_t(1) << std::get<1>(tup);
660 codec_ = getStreamCodec(std::get<2>(tup));
663 void runResetStreamTest(DataHolder const& dh);
664 void runCompressStreamTest(DataHolder const& dh);
665 void runUncompressStreamTest(DataHolder const& dh);
666 void runFlushTest(DataHolder const& dh);
669 std::vector<ByteRange> split(ByteRange data) const;
671 uint64_t uncompressedLength_;
673 std::unique_ptr<StreamCodec> codec_;
676 std::vector<ByteRange> StreamingCompressionTest::split(ByteRange data) const {
677 size_t const pieces = std::max<size_t>(1, data.size() / chunkSize_);
678 std::vector<ByteRange> result;
679 result.reserve(pieces + 1);
680 while (!data.empty()) {
681 size_t const pieceSize = std::min(data.size(), chunkSize_);
682 result.push_back(data.subpiece(0, pieceSize));
683 data.uncheckedAdvance(pieceSize);
688 static std::unique_ptr<IOBuf> compressSome(
692 StreamCodec::FlushOp flush) {
696 auto buffer = IOBuf::create(bufferSize);
697 buffer->append(buffer->capacity());
698 MutableByteRange output{buffer->writableData(), buffer->length()};
700 result = codec->compressStream(data, output, flush);
701 buffer->trimEnd(output.size());
702 queue.append(std::move(buffer));
704 } while (!(flush == StreamCodec::FlushOp::NONE && data.empty()) && !result);
705 EXPECT_TRUE(data.empty());
709 static std::pair<bool, std::unique_ptr<IOBuf>> uncompressSome(
713 StreamCodec::FlushOp flush) {
717 auto buffer = IOBuf::create(bufferSize);
718 buffer->append(buffer->capacity());
719 MutableByteRange output{buffer->writableData(), buffer->length()};
721 result = codec->uncompressStream(data, output, flush);
722 buffer->trimEnd(output.size());
723 queue.append(std::move(buffer));
725 } while (queue.tailroom() == 0 && !result);
726 return std::make_pair(result, queue.move());
729 void StreamingCompressionTest::runResetStreamTest(DataHolder const& dh) {
730 auto const input = dh.data(uncompressedLength_);
731 // Compress some but leave state unclean
732 codec_->resetStream(uncompressedLength_);
733 compressSome(codec_.get(), input, chunkSize_, StreamCodec::FlushOp::NONE);
734 // Reset stream and compress all
735 codec_->resetStream();
737 compressSome(codec_.get(), input, chunkSize_, StreamCodec::FlushOp::END);
738 auto const uncompressed = codec_->uncompress(compressed.get(), input.size());
739 EXPECT_EQ(dh.hash(uncompressedLength_), hashIOBuf(uncompressed.get()));
742 TEST_P(StreamingCompressionTest, resetStream) {
743 runResetStreamTest(constantDataHolder);
744 runResetStreamTest(randomDataHolder);
747 void StreamingCompressionTest::runCompressStreamTest(
748 const folly::io::test::DataHolder& dh) {
749 auto const inputs = split(dh.data(uncompressedLength_));
752 codec_->resetStream(uncompressedLength_);
753 // Compress many inputs in a row
754 for (auto const input : inputs) {
755 queue.append(compressSome(
756 codec_.get(), input, chunkSize_, StreamCodec::FlushOp::NONE));
758 // Finish the operation with empty input.
761 compressSome(codec_.get(), empty, chunkSize_, StreamCodec::FlushOp::END));
763 auto const uncompressed = codec_->uncompress(queue.front());
764 EXPECT_EQ(dh.hash(uncompressedLength_), hashIOBuf(uncompressed.get()));
767 TEST_P(StreamingCompressionTest, compressStream) {
768 runCompressStreamTest(constantDataHolder);
769 runCompressStreamTest(randomDataHolder);
772 void StreamingCompressionTest::runUncompressStreamTest(
773 const folly::io::test::DataHolder& dh) {
774 auto const data = IOBuf::wrapBuffer(dh.data(uncompressedLength_));
775 // Concatenate 3 compressed frames in a row
776 auto compressed = codec_->compress(data.get());
777 compressed->prependChain(codec_->compress(data.get()));
778 compressed->prependChain(codec_->compress(data.get()));
779 // Pass all 3 compressed frames in one input buffer
780 auto input = compressed->coalesce();
781 // Uncompress the first frame
782 codec_->resetStream(data->computeChainDataLength());
784 auto const result = uncompressSome(
785 codec_.get(), input, chunkSize_, StreamCodec::FlushOp::FLUSH);
786 ASSERT_TRUE(result.first);
787 ASSERT_EQ(hashIOBuf(data.get()), hashIOBuf(result.second.get()));
789 // Uncompress the second frame
790 codec_->resetStream();
792 auto const result = uncompressSome(
793 codec_.get(), input, chunkSize_, StreamCodec::FlushOp::END);
794 ASSERT_TRUE(result.first);
795 ASSERT_EQ(hashIOBuf(data.get()), hashIOBuf(result.second.get()));
797 // Uncompress the third frame
798 codec_->resetStream();
800 auto const result = uncompressSome(
801 codec_.get(), input, chunkSize_, StreamCodec::FlushOp::FLUSH);
802 ASSERT_TRUE(result.first);
803 ASSERT_EQ(hashIOBuf(data.get()), hashIOBuf(result.second.get()));
805 EXPECT_TRUE(input.empty());
808 TEST_P(StreamingCompressionTest, uncompressStream) {
809 runUncompressStreamTest(constantDataHolder);
810 runUncompressStreamTest(randomDataHolder);
813 void StreamingCompressionTest::runFlushTest(DataHolder const& dh) {
814 auto const inputs = split(dh.data(uncompressedLength_));
815 auto uncodec = getStreamCodec(codec_->type());
817 codec_->resetStream();
818 for (auto input : inputs) {
819 // Compress some data and flush the stream
820 auto compressed = compressSome(
821 codec_.get(), input, chunkSize_, StreamCodec::FlushOp::FLUSH);
822 auto compressedRange = compressed->coalesce();
823 // Uncompress the compressed data
824 auto result = uncompressSome(
828 StreamCodec::FlushOp::FLUSH);
829 // All compressed data should have been consumed
830 EXPECT_TRUE(compressedRange.empty());
831 // The frame isn't complete
832 EXPECT_FALSE(result.first);
833 // The uncompressed data should be exactly the input data
834 EXPECT_EQ(input.size(), result.second->computeChainDataLength());
835 auto const data = IOBuf::wrapBuffer(input);
836 EXPECT_EQ(hashIOBuf(data.get()), hashIOBuf(result.second.get()));
840 TEST_P(StreamingCompressionTest, testFlush) {
841 runFlushTest(constantDataHolder);
842 runFlushTest(randomDataHolder);
845 INSTANTIATE_TEST_CASE_P(
846 StreamingCompressionTest,
847 StreamingCompressionTest,
849 testing::Values(0, 1, 12, 22, 27),
850 testing::Values(12, 17, 20),
851 testing::ValuesIn(availableStreamCodecs())));
853 class AutomaticCodecTest : public testing::TestWithParam<CodecType> {
855 void SetUp() override {
856 codec_ = getCodec(GetParam());
857 auto_ = getAutoUncompressionCodec();
860 void runSimpleTest(const DataHolder& dh);
862 std::unique_ptr<Codec> codec_;
863 std::unique_ptr<Codec> auto_;
866 void AutomaticCodecTest::runSimpleTest(const DataHolder& dh) {
867 constexpr uint64_t uncompressedLength = 1000;
868 auto original = IOBuf::wrapBuffer(dh.data(uncompressedLength));
869 auto compressed = codec_->compress(original.get());
871 if (!codec_->needsUncompressedLength()) {
872 auto uncompressed = auto_->uncompress(compressed.get());
873 EXPECT_EQ(uncompressedLength, uncompressed->computeChainDataLength());
874 EXPECT_EQ(dh.hash(uncompressedLength), hashIOBuf(uncompressed.get()));
877 auto uncompressed = auto_->uncompress(compressed.get(), uncompressedLength);
878 EXPECT_EQ(uncompressedLength, uncompressed->computeChainDataLength());
879 EXPECT_EQ(dh.hash(uncompressedLength), hashIOBuf(uncompressed.get()));
881 ASSERT_GE(compressed->computeChainDataLength(), 8);
882 for (size_t i = 0; i < 8; ++i) {
883 auto split = compressed->clone();
884 auto rest = compressed->clone();
885 split->trimEnd(split->length() - i);
887 split->appendChain(std::move(rest));
888 auto uncompressed = auto_->uncompress(split.get(), uncompressedLength);
889 EXPECT_EQ(uncompressedLength, uncompressed->computeChainDataLength());
890 EXPECT_EQ(dh.hash(uncompressedLength), hashIOBuf(uncompressed.get()));
894 TEST_P(AutomaticCodecTest, RandomData) {
895 runSimpleTest(randomDataHolder);
898 TEST_P(AutomaticCodecTest, ConstantData) {
899 runSimpleTest(constantDataHolder);
902 TEST_P(AutomaticCodecTest, ValidPrefixes) {
903 const auto prefixes = codec_->validPrefixes();
904 for (const auto& prefix : prefixes) {
905 EXPECT_FALSE(prefix.empty());
906 // Ensure that all strings are at least 8 bytes for LZMA2.
907 // The bytes after the prefix should be ignored by `canUncompress()`.
908 IOBuf data{IOBuf::COPY_BUFFER, prefix, 0, 8};
910 EXPECT_TRUE(codec_->canUncompress(&data));
911 EXPECT_TRUE(auto_->canUncompress(&data));
915 TEST_P(AutomaticCodecTest, NeedsUncompressedLength) {
916 if (codec_->needsUncompressedLength()) {
917 EXPECT_TRUE(auto_->needsUncompressedLength());
921 TEST_P(AutomaticCodecTest, maxUncompressedLength) {
922 EXPECT_LE(codec_->maxUncompressedLength(), auto_->maxUncompressedLength());
925 TEST_P(AutomaticCodecTest, DefaultCodec) {
926 const uint64_t length = 42;
927 std::vector<std::unique_ptr<Codec>> codecs;
928 codecs.push_back(getCodec(CodecType::ZSTD));
929 auto automatic = getAutoUncompressionCodec(std::move(codecs));
930 auto original = IOBuf::wrapBuffer(constantDataHolder.data(length));
931 auto compressed = codec_->compress(original.get());
932 auto decompressed = automatic->uncompress(compressed.get());
934 EXPECT_EQ(constantDataHolder.hash(length), hashIOBuf(decompressed.get()));
938 class CustomCodec : public Codec {
940 static std::unique_ptr<Codec> create(std::string prefix, CodecType type) {
941 return std::make_unique<CustomCodec>(std::move(prefix), type);
943 explicit CustomCodec(std::string prefix, CodecType type)
944 : Codec(CodecType::USER_DEFINED),
945 prefix_(std::move(prefix)),
946 codec_(getCodec(type)) {}
949 std::vector<std::string> validPrefixes() const override {
953 uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override {
954 return codec_->maxCompressedLength(uncompressedLength) + prefix_.size();
957 bool canUncompress(const IOBuf* data, Optional<uint64_t>) const override {
958 auto clone = data->cloneCoalescedAsValue();
959 if (clone.length() < prefix_.size()) {
962 return memcmp(clone.data(), prefix_.data(), prefix_.size()) == 0;
965 std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override {
966 auto result = IOBuf::copyBuffer(prefix_);
967 result->appendChain(codec_->compress(data));
968 EXPECT_TRUE(canUncompress(result.get(), data->computeChainDataLength()));
972 std::unique_ptr<IOBuf> doUncompress(
974 Optional<uint64_t> uncompressedLength) override {
975 EXPECT_TRUE(canUncompress(data, uncompressedLength));
976 auto clone = data->cloneCoalescedAsValue();
977 clone.trimStart(prefix_.size());
978 return codec_->uncompress(&clone, uncompressedLength);
982 std::unique_ptr<Codec> codec_;
986 TEST_P(AutomaticCodecTest, CustomCodec) {
987 const uint64_t length = 42;
988 auto ab = CustomCodec::create("ab", CodecType::ZSTD);
989 std::vector<std::unique_ptr<Codec>> codecs;
990 codecs.push_back(CustomCodec::create("ab", CodecType::ZSTD));
991 auto automatic = getAutoUncompressionCodec(std::move(codecs));
992 auto original = IOBuf::wrapBuffer(constantDataHolder.data(length));
994 auto abCompressed = ab->compress(original.get());
995 auto abDecompressed = automatic->uncompress(abCompressed.get());
996 EXPECT_TRUE(automatic->canUncompress(abCompressed.get()));
997 EXPECT_FALSE(auto_->canUncompress(abCompressed.get()));
998 EXPECT_EQ(constantDataHolder.hash(length), hashIOBuf(abDecompressed.get()));
1000 auto compressed = codec_->compress(original.get());
1001 auto decompressed = automatic->uncompress(compressed.get());
1002 EXPECT_EQ(constantDataHolder.hash(length), hashIOBuf(decompressed.get()));
1005 TEST_P(AutomaticCodecTest, CustomDefaultCodec) {
1006 const uint64_t length = 42;
1007 auto none = CustomCodec::create("none", CodecType::NO_COMPRESSION);
1008 std::vector<std::unique_ptr<Codec>> codecs;
1009 codecs.push_back(CustomCodec::create("none", CodecType::NO_COMPRESSION));
1010 codecs.push_back(getCodec(CodecType::LZ4_FRAME));
1011 auto automatic = getAutoUncompressionCodec(std::move(codecs));
1012 auto original = IOBuf::wrapBuffer(constantDataHolder.data(length));
1014 auto noneCompressed = none->compress(original.get());
1015 auto noneDecompressed = automatic->uncompress(noneCompressed.get());
1016 EXPECT_TRUE(automatic->canUncompress(noneCompressed.get()));
1017 EXPECT_FALSE(auto_->canUncompress(noneCompressed.get()));
1018 EXPECT_EQ(constantDataHolder.hash(length), hashIOBuf(noneDecompressed.get()));
1020 auto compressed = codec_->compress(original.get());
1021 auto decompressed = automatic->uncompress(compressed.get());
1022 EXPECT_EQ(constantDataHolder.hash(length), hashIOBuf(decompressed.get()));
1025 TEST_P(AutomaticCodecTest, canUncompressOneBytes) {
1026 // No default codec can uncompress 1 bytes.
1027 IOBuf buf{IOBuf::CREATE, 1};
1029 EXPECT_FALSE(codec_->canUncompress(&buf, 1));
1030 EXPECT_FALSE(codec_->canUncompress(&buf, folly::none));
1031 EXPECT_FALSE(auto_->canUncompress(&buf, 1));
1032 EXPECT_FALSE(auto_->canUncompress(&buf, folly::none));
1035 INSTANTIATE_TEST_CASE_P(
1039 CodecType::LZ4_FRAME,
1046 TEST(ValidPrefixesTest, CustomCodec) {
1047 std::vector<std::unique_ptr<Codec>> codecs;
1048 codecs.push_back(CustomCodec::create("none", CodecType::NO_COMPRESSION));
1049 const auto none = getAutoUncompressionCodec(std::move(codecs));
1050 const auto prefixes = none->validPrefixes();
1051 const auto it = std::find(prefixes.begin(), prefixes.end(), "none");
1052 EXPECT_TRUE(it != prefixes.end());
1055 #define EXPECT_THROW_IF_DEBUG(statement, expected_exception) \
1058 EXPECT_THROW((statement), expected_exception); \
1060 EXPECT_NO_THROW((statement)); \
1064 TEST(CheckCompatibleTest, SimplePrefixSecond) {
1065 std::vector<std::unique_ptr<Codec>> codecs;
1066 codecs.push_back(CustomCodec::create("abc", CodecType::NO_COMPRESSION));
1067 codecs.push_back(CustomCodec::create("ab", CodecType::NO_COMPRESSION));
1068 EXPECT_THROW_IF_DEBUG(
1069 getAutoUncompressionCodec(std::move(codecs)), std::invalid_argument);
1072 TEST(CheckCompatibleTest, SimplePrefixFirst) {
1073 std::vector<std::unique_ptr<Codec>> codecs;
1074 codecs.push_back(CustomCodec::create("ab", CodecType::NO_COMPRESSION));
1075 codecs.push_back(CustomCodec::create("abc", CodecType::NO_COMPRESSION));
1076 EXPECT_THROW_IF_DEBUG(
1077 getAutoUncompressionCodec(std::move(codecs)), std::invalid_argument);
1080 TEST(CheckCompatibleTest, Empty) {
1081 std::vector<std::unique_ptr<Codec>> codecs;
1082 codecs.push_back(CustomCodec::create("", CodecType::NO_COMPRESSION));
1083 EXPECT_THROW_IF_DEBUG(
1084 getAutoUncompressionCodec(std::move(codecs)), std::invalid_argument);
1087 TEST(CheckCompatibleTest, ZstdPrefix) {
1088 std::vector<std::unique_ptr<Codec>> codecs;
1089 codecs.push_back(CustomCodec::create("\x28\xB5\x2F", CodecType::ZSTD));
1090 EXPECT_THROW_IF_DEBUG(
1091 getAutoUncompressionCodec(std::move(codecs)), std::invalid_argument);
1094 TEST(CheckCompatibleTest, ZstdDuplicate) {
1095 std::vector<std::unique_ptr<Codec>> codecs;
1096 codecs.push_back(CustomCodec::create("\x28\xB5\x2F\xFD", CodecType::ZSTD));
1097 EXPECT_THROW_IF_DEBUG(
1098 getAutoUncompressionCodec(std::move(codecs)), std::invalid_argument);
1101 TEST(CheckCompatibleTest, ZlibIsPrefix) {
1102 std::vector<std::unique_ptr<Codec>> codecs;
1103 codecs.push_back(CustomCodec::create("\x18\x76zzasdf", CodecType::ZSTD));
1104 EXPECT_THROW_IF_DEBUG(
1105 getAutoUncompressionCodec(std::move(codecs)), std::invalid_argument);
1108 #if FOLLY_HAVE_LIBZSTD
1110 TEST(ZstdTest, BackwardCompatible) {
1111 auto codec = getCodec(CodecType::ZSTD);
1113 auto const data = IOBuf::wrapBuffer(randomDataHolder.data(size_t(1) << 20));
1114 auto compressed = codec->compress(data.get());
1115 compressed->coalesce();
1118 ZSTD_getDecompressedSize(compressed->data(), compressed->length()));
1122 IOBuf::wrapBuffer(randomDataHolder.data(size_t(100) << 20));
1123 auto compressed = codec->compress(data.get());
1124 compressed->coalesce();
1127 ZSTD_getDecompressedSize(compressed->data(), compressed->length()));
1134 } // namespace folly
1136 int main(int argc, char *argv[]) {
1137 testing::InitGoogleTest(&argc, argv);
1138 gflags::ParseCommandLineFlags(&argc, &argv, true);
1140 auto ret = RUN_ALL_TESTS();
1142 folly::runBenchmarksOnFlag();