2 * Copyright 2017 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #include <folly/io/Compression.h>
23 #include <unordered_map>
26 #include <boost/noncopyable.hpp>
27 #include <glog/logging.h>
29 #include <folly/Benchmark.h>
30 #include <folly/Hash.h>
31 #include <folly/Memory.h>
32 #include <folly/Random.h>
33 #include <folly/Varint.h>
34 #include <folly/io/IOBufQueue.h>
35 #include <folly/portability/GTest.h>
37 #if FOLLY_HAVE_LIBZSTD
41 namespace folly { namespace io { namespace test {
43 class DataHolder : private boost::noncopyable {
45 uint64_t hash(size_t size) const;
46 ByteRange data(size_t size) const;
49 explicit DataHolder(size_t sizeLog2);
51 std::unique_ptr<uint8_t[]> data_;
52 mutable std::unordered_map<uint64_t, uint64_t> hashCache_;
55 DataHolder::DataHolder(size_t sizeLog2)
56 : size_(size_t(1) << sizeLog2),
57 data_(new uint8_t[size_]) {
60 uint64_t DataHolder::hash(size_t size) const {
61 CHECK_LE(size, size_);
62 auto p = hashCache_.find(size);
63 if (p != hashCache_.end()) {
67 uint64_t h = folly::hash::fnv64_buf(data_.get(), size);
72 ByteRange DataHolder::data(size_t size) const {
73 CHECK_LE(size, size_);
74 return ByteRange(data_.get(), size);
77 uint64_t hashIOBuf(const IOBuf* buf) {
78 uint64_t h = folly::hash::FNV_64_HASH_START;
79 for (auto& range : *buf) {
80 h = folly::hash::fnv64_buf(range.data(), range.size(), h);
85 class RandomDataHolder : public DataHolder {
87 explicit RandomDataHolder(size_t sizeLog2);
90 RandomDataHolder::RandomDataHolder(size_t sizeLog2)
91 : DataHolder(sizeLog2) {
92 static constexpr size_t numThreadsLog2 = 3;
93 static constexpr size_t numThreads = size_t(1) << numThreadsLog2;
95 uint32_t seed = randomNumberSeed();
97 std::vector<std::thread> threads;
98 threads.reserve(numThreads);
99 for (size_t t = 0; t < numThreads; ++t) {
100 threads.emplace_back([this, seed, t, sizeLog2] {
101 std::mt19937 rng(seed + t);
102 size_t countLog2 = sizeLog2 - numThreadsLog2;
103 size_t start = size_t(t) << countLog2;
104 for (size_t i = 0; i < countLog2; ++i) {
105 this->data_[start + i] = rng();
110 for (auto& t : threads) {
115 class ConstantDataHolder : public DataHolder {
117 explicit ConstantDataHolder(size_t sizeLog2);
120 ConstantDataHolder::ConstantDataHolder(size_t sizeLog2)
121 : DataHolder(sizeLog2) {
122 memset(data_.get(), 'a', size_);
125 constexpr size_t dataSizeLog2 = 27; // 128MiB
126 RandomDataHolder randomDataHolder(dataSizeLog2);
127 ConstantDataHolder constantDataHolder(dataSizeLog2);
129 // The intersection of the provided codecs & those that are compiled in.
130 static std::vector<CodecType> supportedCodecs(std::vector<CodecType> const& v) {
131 std::vector<CodecType> supported;
136 std::back_inserter(supported),
142 // All compiled-in compression codecs.
143 static std::vector<CodecType> availableCodecs() {
144 std::vector<CodecType> codecs;
146 for (size_t i = 0; i < static_cast<size_t>(CodecType::NUM_CODEC_TYPES); ++i) {
147 auto type = static_cast<CodecType>(i);
148 if (hasCodec(type)) {
149 codecs.push_back(type);
156 static std::vector<CodecType> availableStreamCodecs() {
157 std::vector<CodecType> codecs;
159 for (size_t i = 0; i < static_cast<size_t>(CodecType::NUM_CODEC_TYPES); ++i) {
160 auto type = static_cast<CodecType>(i);
161 if (hasStreamCodec(type)) {
162 codecs.push_back(type);
169 TEST(CompressionTestNeedsUncompressedLength, Simple) {
170 static const struct { CodecType type; bool needsUncompressedLength; }
172 { CodecType::NO_COMPRESSION, false },
173 { CodecType::LZ4, true },
174 { CodecType::SNAPPY, false },
175 { CodecType::ZLIB, false },
176 { CodecType::LZ4_VARINT_SIZE, false },
177 { CodecType::LZMA2, false },
178 { CodecType::LZMA2_VARINT_SIZE, false },
179 { CodecType::ZSTD, false },
180 { CodecType::GZIP, false },
181 { CodecType::LZ4_FRAME, false },
182 { CodecType::BZIP2, false },
185 for (auto const& test : expectations) {
186 if (hasCodec(test.type)) {
187 EXPECT_EQ(getCodec(test.type)->needsUncompressedLength(),
188 test.needsUncompressedLength);
193 class CompressionTest
194 : public testing::TestWithParam<std::tr1::tuple<int, int, CodecType>> {
196 void SetUp() override {
197 auto tup = GetParam();
198 uncompressedLength_ = uint64_t(1) << std::tr1::get<0>(tup);
199 chunks_ = std::tr1::get<1>(tup);
200 codec_ = getCodec(std::tr1::get<2>(tup));
203 void runSimpleIOBufTest(const DataHolder& dh);
205 void runSimpleStringTest(const DataHolder& dh);
208 std::unique_ptr<IOBuf> split(std::unique_ptr<IOBuf> data) const;
210 uint64_t uncompressedLength_;
212 std::unique_ptr<Codec> codec_;
215 void CompressionTest::runSimpleIOBufTest(const DataHolder& dh) {
216 const auto original = split(IOBuf::wrapBuffer(dh.data(uncompressedLength_)));
217 const auto compressed = split(codec_->compress(original.get()));
218 if (!codec_->needsUncompressedLength()) {
219 auto uncompressed = codec_->uncompress(compressed.get());
220 EXPECT_EQ(uncompressedLength_, uncompressed->computeChainDataLength());
221 EXPECT_EQ(dh.hash(uncompressedLength_), hashIOBuf(uncompressed.get()));
224 auto uncompressed = codec_->uncompress(compressed.get(),
225 uncompressedLength_);
226 EXPECT_EQ(uncompressedLength_, uncompressed->computeChainDataLength());
227 EXPECT_EQ(dh.hash(uncompressedLength_), hashIOBuf(uncompressed.get()));
231 void CompressionTest::runSimpleStringTest(const DataHolder& dh) {
232 const auto original = std::string(
233 reinterpret_cast<const char*>(dh.data(uncompressedLength_).data()),
234 uncompressedLength_);
235 const auto compressed = codec_->compress(original);
236 if (!codec_->needsUncompressedLength()) {
237 auto uncompressed = codec_->uncompress(compressed);
238 EXPECT_EQ(uncompressedLength_, uncompressed.length());
239 EXPECT_EQ(uncompressed, original);
242 auto uncompressed = codec_->uncompress(compressed, uncompressedLength_);
243 EXPECT_EQ(uncompressedLength_, uncompressed.length());
244 EXPECT_EQ(uncompressed, original);
248 // Uniformly split data into (potentially empty) chunks.
249 std::unique_ptr<IOBuf> CompressionTest::split(
250 std::unique_ptr<IOBuf> data) const {
251 if (data->isChained()) {
255 const size_t size = data->computeChainDataLength();
257 std::multiset<size_t> splits;
258 for (size_t i = 1; i < chunks_; ++i) {
259 splits.insert(Random::rand64(size));
262 folly::IOBufQueue result;
265 for (size_t split : splits) {
266 result.append(IOBuf::copyBuffer(data->data() + offset, split - offset));
269 result.append(IOBuf::copyBuffer(data->data() + offset, size - offset));
271 return result.move();
274 TEST_P(CompressionTest, RandomData) {
275 runSimpleIOBufTest(randomDataHolder);
278 TEST_P(CompressionTest, ConstantData) {
279 runSimpleIOBufTest(constantDataHolder);
282 TEST_P(CompressionTest, RandomDataString) {
283 runSimpleStringTest(randomDataHolder);
286 TEST_P(CompressionTest, ConstantDataString) {
287 runSimpleStringTest(constantDataHolder);
290 INSTANTIATE_TEST_CASE_P(
294 testing::Values(0, 1, 12, 22, 25, 27),
295 testing::Values(1, 2, 3, 8, 65),
296 testing::ValuesIn(availableCodecs())));
298 class CompressionVarintTest
299 : public testing::TestWithParam<std::tr1::tuple<int, CodecType>> {
301 void SetUp() override {
302 auto tup = GetParam();
303 uncompressedLength_ = uint64_t(1) << std::tr1::get<0>(tup);
304 codec_ = getCodec(std::tr1::get<1>(tup));
307 void runSimpleTest(const DataHolder& dh);
309 uint64_t uncompressedLength_;
310 std::unique_ptr<Codec> codec_;
313 inline uint64_t oneBasedMsbPos(uint64_t number) {
315 for (; number > 0; ++pos, number >>= 1) {
320 void CompressionVarintTest::runSimpleTest(const DataHolder& dh) {
321 auto original = IOBuf::wrapBuffer(dh.data(uncompressedLength_));
322 auto compressed = codec_->compress(original.get());
326 std::max(uint64_t(9), oneBasedMsbPos(uncompressedLength_)) / 9UL);
327 auto tinyBuf = IOBuf::copyBuffer(compressed->data(),
328 std::min(compressed->length(), breakPoint));
329 compressed->trimStart(breakPoint);
330 tinyBuf->prependChain(std::move(compressed));
331 compressed = std::move(tinyBuf);
333 auto uncompressed = codec_->uncompress(compressed.get());
335 EXPECT_EQ(uncompressedLength_, uncompressed->computeChainDataLength());
336 EXPECT_EQ(dh.hash(uncompressedLength_), hashIOBuf(uncompressed.get()));
339 TEST_P(CompressionVarintTest, RandomData) {
340 runSimpleTest(randomDataHolder);
343 TEST_P(CompressionVarintTest, ConstantData) {
344 runSimpleTest(constantDataHolder);
347 INSTANTIATE_TEST_CASE_P(
348 CompressionVarintTest,
349 CompressionVarintTest,
351 testing::Values(0, 1, 12, 22, 25, 27),
352 testing::ValuesIn(supportedCodecs({
353 CodecType::LZ4_VARINT_SIZE,
354 CodecType::LZMA2_VARINT_SIZE,
357 class CompressionCorruptionTest : public testing::TestWithParam<CodecType> {
359 void SetUp() override { codec_ = getCodec(GetParam()); }
361 void runSimpleTest(const DataHolder& dh);
363 std::unique_ptr<Codec> codec_;
366 void CompressionCorruptionTest::runSimpleTest(const DataHolder& dh) {
367 constexpr uint64_t uncompressedLength = 42;
368 auto original = IOBuf::wrapBuffer(dh.data(uncompressedLength));
369 auto compressed = codec_->compress(original.get());
371 if (!codec_->needsUncompressedLength()) {
372 auto uncompressed = codec_->uncompress(compressed.get());
373 EXPECT_EQ(uncompressedLength, uncompressed->computeChainDataLength());
374 EXPECT_EQ(dh.hash(uncompressedLength), hashIOBuf(uncompressed.get()));
377 auto uncompressed = codec_->uncompress(compressed.get(),
379 EXPECT_EQ(uncompressedLength, uncompressed->computeChainDataLength());
380 EXPECT_EQ(dh.hash(uncompressedLength), hashIOBuf(uncompressed.get()));
383 EXPECT_THROW(codec_->uncompress(compressed.get(), uncompressedLength + 1),
386 auto corrupted = compressed->clone();
387 corrupted->unshare();
388 // Truncate the last character
389 corrupted->prev()->trimEnd(1);
390 if (!codec_->needsUncompressedLength()) {
391 EXPECT_THROW(codec_->uncompress(corrupted.get()),
395 EXPECT_THROW(codec_->uncompress(corrupted.get(), uncompressedLength),
398 corrupted = compressed->clone();
399 corrupted->unshare();
400 // Corrupt the first character
401 ++(corrupted->writableData()[0]);
403 if (!codec_->needsUncompressedLength()) {
404 EXPECT_THROW(codec_->uncompress(corrupted.get()),
408 EXPECT_THROW(codec_->uncompress(corrupted.get(), uncompressedLength),
412 TEST_P(CompressionCorruptionTest, RandomData) {
413 runSimpleTest(randomDataHolder);
416 TEST_P(CompressionCorruptionTest, ConstantData) {
417 runSimpleTest(constantDataHolder);
420 INSTANTIATE_TEST_CASE_P(
421 CompressionCorruptionTest,
422 CompressionCorruptionTest,
424 // NO_COMPRESSION can't detect corruption
425 // LZ4 can't detect corruption reliably (sigh)
431 CodecType::LZ4_FRAME,
435 class StreamingUnitTest : public testing::TestWithParam<CodecType> {
437 void SetUp() override {
438 codec_ = getStreamCodec(GetParam());
441 std::unique_ptr<StreamCodec> codec_;
444 TEST_P(StreamingUnitTest, maxCompressedLength) {
445 EXPECT_EQ(0, codec_->maxCompressedLength(0));
446 for (uint64_t const length : {1, 10, 100, 1000, 10000, 100000, 1000000}) {
447 EXPECT_GE(codec_->maxCompressedLength(length), length);
451 TEST_P(StreamingUnitTest, getUncompressedLength) {
452 auto const empty = IOBuf::create(0);
453 EXPECT_EQ(uint64_t(0), codec_->getUncompressedLength(empty.get()));
454 EXPECT_EQ(uint64_t(0), codec_->getUncompressedLength(empty.get(), 0));
456 auto const data = IOBuf::wrapBuffer(randomDataHolder.data(100));
457 auto const compressed = codec_->compress(data.get());
459 EXPECT_ANY_THROW(codec_->getUncompressedLength(data.get(), 0));
460 if (auto const length = codec_->getUncompressedLength(data.get())) {
461 EXPECT_EQ(100, *length);
463 EXPECT_EQ(uint64_t(100), codec_->getUncompressedLength(data.get(), 100));
464 // If the uncompressed length is stored in the frame, then make sure it throws
465 // when it is given the wrong length.
466 if (codec_->getUncompressedLength(data.get()) == uint64_t(100)) {
467 EXPECT_ANY_THROW(codec_->getUncompressedLength(data.get(), 200));
471 TEST_P(StreamingUnitTest, emptyData) {
473 auto buffer = IOBuf::create(1);
474 buffer->append(buffer->capacity());
475 MutableByteRange output{};
477 // Test compressing empty data in one pass
478 EXPECT_TRUE(codec_->compressStream(input, output, StreamCodec::FlushOp::END));
479 codec_->resetStream(0);
480 EXPECT_TRUE(codec_->compressStream(input, output, StreamCodec::FlushOp::END));
481 codec_->resetStream();
482 output = {buffer->writableData(), buffer->length()};
483 EXPECT_TRUE(codec_->compressStream(input, output, StreamCodec::FlushOp::END));
484 EXPECT_EQ(buffer->length(), output.size());
486 // Test compressing empty data with multiple calls to compressStream()
487 codec_->resetStream();
489 EXPECT_FALSE(codec_->compressStream(input, output));
491 codec_->compressStream(input, output, StreamCodec::FlushOp::FLUSH));
492 EXPECT_TRUE(codec_->compressStream(input, output, StreamCodec::FlushOp::END));
493 codec_->resetStream();
494 output = {buffer->writableData(), buffer->length()};
495 EXPECT_FALSE(codec_->compressStream(input, output));
497 codec_->compressStream(input, output, StreamCodec::FlushOp::FLUSH));
498 EXPECT_TRUE(codec_->compressStream(input, output, StreamCodec::FlushOp::END));
499 EXPECT_EQ(buffer->length(), output.size());
501 // Test uncompressing empty data
503 codec_->resetStream();
504 EXPECT_TRUE(codec_->uncompressStream(input, output));
505 codec_->resetStream();
507 codec_->uncompressStream(input, output, StreamCodec::FlushOp::FLUSH));
508 codec_->resetStream();
510 codec_->uncompressStream(input, output, StreamCodec::FlushOp::END));
511 codec_->resetStream(0);
512 EXPECT_TRUE(codec_->uncompressStream(input, output));
513 codec_->resetStream(0);
515 codec_->uncompressStream(input, output, StreamCodec::FlushOp::FLUSH));
516 codec_->resetStream(0);
518 codec_->uncompressStream(input, output, StreamCodec::FlushOp::END));
521 TEST_P(StreamingUnitTest, noForwardProgressOkay) {
522 auto inBuffer = IOBuf::create(2);
523 inBuffer->writableData()[0] = 'a';
524 inBuffer->writableData()[0] = 'a';
526 auto input = inBuffer->coalesce();
527 auto compressed = codec_->compress(inBuffer.get());
529 auto outBuffer = IOBuf::create(codec_->maxCompressedLength(2));
530 MutableByteRange output{outBuffer->writableTail(), outBuffer->tailroom()};
532 ByteRange emptyInput;
533 MutableByteRange emptyOutput;
535 // Compress some data to avoid empty data special casing
536 codec_->resetStream();
537 while (!input.empty()) {
538 codec_->compressStream(input, output);
540 // empty input and output is okay for flush NONE and FLUSH.
541 codec_->compressStream(emptyInput, emptyOutput);
542 codec_->compressStream(emptyInput, emptyOutput, StreamCodec::FlushOp::FLUSH);
544 codec_->resetStream();
545 input = inBuffer->coalesce();
546 output = {outBuffer->writableTail(), outBuffer->tailroom()};
547 while (!input.empty()) {
548 codec_->compressStream(input, output);
550 // empty input and output is okay for flush END.
551 codec_->compressStream(emptyInput, emptyOutput, StreamCodec::FlushOp::END);
553 codec_->resetStream();
554 input = compressed->coalesce();
555 input.uncheckedSubtract(1); // Remove last byte so the operation is incomplete
556 output = {inBuffer->writableData(), inBuffer->length()};
557 // Uncompress some data to avoid empty data special casing
558 while (!input.empty()) {
559 EXPECT_FALSE(codec_->uncompressStream(input, output));
561 // empty input and output is okay for all flush values.
562 EXPECT_FALSE(codec_->uncompressStream(emptyInput, emptyOutput));
563 EXPECT_FALSE(codec_->uncompressStream(
564 emptyInput, emptyOutput, StreamCodec::FlushOp::FLUSH));
565 EXPECT_FALSE(codec_->uncompressStream(
566 emptyInput, emptyOutput, StreamCodec::FlushOp::END));
569 TEST_P(StreamingUnitTest, stateTransitions) {
570 auto inBuffer = IOBuf::create(1);
571 inBuffer->writableData()[0] = 'a';
573 auto compressed = codec_->compress(inBuffer.get());
574 ByteRange const in = compressed->coalesce();
575 auto outBuffer = IOBuf::create(codec_->maxCompressedLength(in.size()));
576 MutableByteRange const out{outBuffer->writableTail(), outBuffer->tailroom()};
579 StreamCodec::FlushOp flushOp = StreamCodec::FlushOp::NONE,
580 bool empty = false) {
582 auto output = empty ? MutableByteRange{} : out;
583 return codec_->compressStream(input, output, flushOp);
585 auto uncompress = [&](
586 StreamCodec::FlushOp flushOp = StreamCodec::FlushOp::NONE,
587 bool empty = false) {
589 auto output = empty ? MutableByteRange{} : out;
590 return codec_->uncompressStream(input, output, flushOp);
594 codec_->resetStream();
595 EXPECT_FALSE(compress());
596 EXPECT_FALSE(compress());
597 EXPECT_TRUE(compress(StreamCodec::FlushOp::FLUSH));
598 EXPECT_FALSE(compress());
599 EXPECT_TRUE(compress(StreamCodec::FlushOp::END));
600 // uncompression flow
601 codec_->resetStream();
602 EXPECT_FALSE(uncompress(StreamCodec::FlushOp::NONE, true));
603 codec_->resetStream();
604 EXPECT_FALSE(uncompress(StreamCodec::FlushOp::FLUSH, true));
605 codec_->resetStream();
606 EXPECT_FALSE(uncompress(StreamCodec::FlushOp::NONE, true));
607 codec_->resetStream();
608 EXPECT_FALSE(uncompress(StreamCodec::FlushOp::NONE, true));
609 codec_->resetStream();
610 EXPECT_TRUE(uncompress(StreamCodec::FlushOp::FLUSH));
611 // compress -> uncompress
612 codec_->resetStream();
613 EXPECT_FALSE(compress());
614 EXPECT_THROW(uncompress(), std::logic_error);
615 // uncompress -> compress
616 codec_->resetStream();
617 EXPECT_TRUE(uncompress(StreamCodec::FlushOp::FLUSH));
618 EXPECT_THROW(compress(), std::logic_error);
620 codec_->resetStream();
621 EXPECT_FALSE(compress());
622 EXPECT_TRUE(compress(StreamCodec::FlushOp::END));
623 EXPECT_THROW(compress(), std::logic_error);
625 codec_->resetStream();
626 EXPECT_TRUE(uncompress(StreamCodec::FlushOp::FLUSH));
627 EXPECT_THROW(uncompress(), std::logic_error);
629 codec_->resetStream();
630 EXPECT_FALSE(compress(StreamCodec::FlushOp::FLUSH, true));
631 EXPECT_THROW(compress(), std::logic_error);
633 codec_->resetStream();
634 EXPECT_FALSE(compress(StreamCodec::FlushOp::FLUSH, true));
635 EXPECT_THROW(compress(StreamCodec::FlushOp::END), std::logic_error);
636 // undefined -> compress
637 codec_->compress(inBuffer.get());
638 EXPECT_THROW(compress(), std::logic_error);
639 codec_->uncompress(compressed.get());
640 EXPECT_THROW(compress(), std::logic_error);
641 // undefined -> undefined
642 codec_->uncompress(compressed.get());
643 codec_->compress(inBuffer.get());
646 INSTANTIATE_TEST_CASE_P(
649 testing::ValuesIn(availableStreamCodecs()));
651 class StreamingCompressionTest
652 : public testing::TestWithParam<std::tuple<int, int, CodecType>> {
654 void SetUp() override {
655 auto const tup = GetParam();
656 uncompressedLength_ = uint64_t(1) << std::get<0>(tup);
657 chunkSize_ = size_t(1) << std::get<1>(tup);
658 codec_ = getStreamCodec(std::get<2>(tup));
661 void runResetStreamTest(DataHolder const& dh);
662 void runCompressStreamTest(DataHolder const& dh);
663 void runUncompressStreamTest(DataHolder const& dh);
664 void runFlushTest(DataHolder const& dh);
667 std::vector<ByteRange> split(ByteRange data) const;
669 uint64_t uncompressedLength_;
671 std::unique_ptr<StreamCodec> codec_;
674 std::vector<ByteRange> StreamingCompressionTest::split(ByteRange data) const {
675 size_t const pieces = std::max<size_t>(1, data.size() / chunkSize_);
676 std::vector<ByteRange> result;
677 result.reserve(pieces + 1);
678 while (!data.empty()) {
679 size_t const pieceSize = std::min(data.size(), chunkSize_);
680 result.push_back(data.subpiece(0, pieceSize));
681 data.uncheckedAdvance(pieceSize);
686 static std::unique_ptr<IOBuf> compressSome(
690 StreamCodec::FlushOp flush) {
694 auto buffer = IOBuf::create(bufferSize);
695 buffer->append(buffer->capacity());
696 MutableByteRange output{buffer->writableData(), buffer->length()};
698 result = codec->compressStream(data, output, flush);
699 buffer->trimEnd(output.size());
700 queue.append(std::move(buffer));
702 } while (!(flush == StreamCodec::FlushOp::NONE && data.empty()) && !result);
703 EXPECT_TRUE(data.empty());
707 static std::pair<bool, std::unique_ptr<IOBuf>> uncompressSome(
711 StreamCodec::FlushOp flush) {
715 auto buffer = IOBuf::create(bufferSize);
716 buffer->append(buffer->capacity());
717 MutableByteRange output{buffer->writableData(), buffer->length()};
719 result = codec->uncompressStream(data, output, flush);
720 buffer->trimEnd(output.size());
721 queue.append(std::move(buffer));
723 } while (queue.tailroom() == 0 && !result);
724 return std::make_pair(result, queue.move());
727 void StreamingCompressionTest::runResetStreamTest(DataHolder const& dh) {
728 auto const input = dh.data(uncompressedLength_);
729 // Compress some but leave state unclean
730 codec_->resetStream(uncompressedLength_);
731 compressSome(codec_.get(), input, chunkSize_, StreamCodec::FlushOp::NONE);
732 // Reset stream and compress all
733 codec_->resetStream();
735 compressSome(codec_.get(), input, chunkSize_, StreamCodec::FlushOp::END);
736 auto const uncompressed = codec_->uncompress(compressed.get(), input.size());
737 EXPECT_EQ(dh.hash(uncompressedLength_), hashIOBuf(uncompressed.get()));
740 TEST_P(StreamingCompressionTest, resetStream) {
741 runResetStreamTest(constantDataHolder);
742 runResetStreamTest(randomDataHolder);
745 void StreamingCompressionTest::runCompressStreamTest(
746 const folly::io::test::DataHolder& dh) {
747 auto const inputs = split(dh.data(uncompressedLength_));
750 codec_->resetStream(uncompressedLength_);
751 // Compress many inputs in a row
752 for (auto const input : inputs) {
753 queue.append(compressSome(
754 codec_.get(), input, chunkSize_, StreamCodec::FlushOp::NONE));
756 // Finish the operation with empty input.
759 compressSome(codec_.get(), empty, chunkSize_, StreamCodec::FlushOp::END));
761 auto const uncompressed = codec_->uncompress(queue.front());
762 EXPECT_EQ(dh.hash(uncompressedLength_), hashIOBuf(uncompressed.get()));
765 TEST_P(StreamingCompressionTest, compressStream) {
766 runCompressStreamTest(constantDataHolder);
767 runCompressStreamTest(randomDataHolder);
770 void StreamingCompressionTest::runUncompressStreamTest(
771 const folly::io::test::DataHolder& dh) {
772 auto const data = IOBuf::wrapBuffer(dh.data(uncompressedLength_));
773 // Concatenate 3 compressed frames in a row
774 auto compressed = codec_->compress(data.get());
775 compressed->prependChain(codec_->compress(data.get()));
776 compressed->prependChain(codec_->compress(data.get()));
777 // Pass all 3 compressed frames in one input buffer
778 auto input = compressed->coalesce();
779 // Uncompress the first frame
780 codec_->resetStream(data->computeChainDataLength());
782 auto const result = uncompressSome(
783 codec_.get(), input, chunkSize_, StreamCodec::FlushOp::FLUSH);
784 ASSERT_TRUE(result.first);
785 ASSERT_EQ(hashIOBuf(data.get()), hashIOBuf(result.second.get()));
787 // Uncompress the second frame
788 codec_->resetStream();
790 auto const result = uncompressSome(
791 codec_.get(), input, chunkSize_, StreamCodec::FlushOp::END);
792 ASSERT_TRUE(result.first);
793 ASSERT_EQ(hashIOBuf(data.get()), hashIOBuf(result.second.get()));
795 // Uncompress the third frame
796 codec_->resetStream();
798 auto const result = uncompressSome(
799 codec_.get(), input, chunkSize_, StreamCodec::FlushOp::FLUSH);
800 ASSERT_TRUE(result.first);
801 ASSERT_EQ(hashIOBuf(data.get()), hashIOBuf(result.second.get()));
803 EXPECT_TRUE(input.empty());
806 TEST_P(StreamingCompressionTest, uncompressStream) {
807 runUncompressStreamTest(constantDataHolder);
808 runUncompressStreamTest(randomDataHolder);
811 void StreamingCompressionTest::runFlushTest(DataHolder const& dh) {
812 auto const inputs = split(dh.data(uncompressedLength_));
813 auto uncodec = getStreamCodec(codec_->type());
815 codec_->resetStream();
816 for (auto input : inputs) {
817 // Compress some data and flush the stream
818 auto compressed = compressSome(
819 codec_.get(), input, chunkSize_, StreamCodec::FlushOp::FLUSH);
820 auto compressedRange = compressed->coalesce();
821 // Uncompress the compressed data
822 auto result = uncompressSome(
826 StreamCodec::FlushOp::FLUSH);
827 // All compressed data should have been consumed
828 EXPECT_TRUE(compressedRange.empty());
829 // The frame isn't complete
830 EXPECT_FALSE(result.first);
831 // The uncompressed data should be exactly the input data
832 EXPECT_EQ(input.size(), result.second->computeChainDataLength());
833 auto const data = IOBuf::wrapBuffer(input);
834 EXPECT_EQ(hashIOBuf(data.get()), hashIOBuf(result.second.get()));
838 TEST_P(StreamingCompressionTest, testFlush) {
839 runFlushTest(constantDataHolder);
840 runFlushTest(randomDataHolder);
843 INSTANTIATE_TEST_CASE_P(
844 StreamingCompressionTest,
845 StreamingCompressionTest,
847 testing::Values(0, 1, 12, 22, 27),
848 testing::Values(12, 17, 20),
849 testing::ValuesIn(availableStreamCodecs())));
851 class AutomaticCodecTest : public testing::TestWithParam<CodecType> {
853 void SetUp() override {
854 codec_ = getCodec(GetParam());
855 auto_ = getAutoUncompressionCodec();
858 void runSimpleTest(const DataHolder& dh);
860 std::unique_ptr<Codec> codec_;
861 std::unique_ptr<Codec> auto_;
864 void AutomaticCodecTest::runSimpleTest(const DataHolder& dh) {
865 constexpr uint64_t uncompressedLength = 1000;
866 auto original = IOBuf::wrapBuffer(dh.data(uncompressedLength));
867 auto compressed = codec_->compress(original.get());
869 if (!codec_->needsUncompressedLength()) {
870 auto uncompressed = auto_->uncompress(compressed.get());
871 EXPECT_EQ(uncompressedLength, uncompressed->computeChainDataLength());
872 EXPECT_EQ(dh.hash(uncompressedLength), hashIOBuf(uncompressed.get()));
875 auto uncompressed = auto_->uncompress(compressed.get(), uncompressedLength);
876 EXPECT_EQ(uncompressedLength, uncompressed->computeChainDataLength());
877 EXPECT_EQ(dh.hash(uncompressedLength), hashIOBuf(uncompressed.get()));
879 ASSERT_GE(compressed->computeChainDataLength(), 8);
880 for (size_t i = 0; i < 8; ++i) {
881 auto split = compressed->clone();
882 auto rest = compressed->clone();
883 split->trimEnd(split->length() - i);
885 split->appendChain(std::move(rest));
886 auto uncompressed = auto_->uncompress(split.get(), uncompressedLength);
887 EXPECT_EQ(uncompressedLength, uncompressed->computeChainDataLength());
888 EXPECT_EQ(dh.hash(uncompressedLength), hashIOBuf(uncompressed.get()));
892 TEST_P(AutomaticCodecTest, RandomData) {
893 runSimpleTest(randomDataHolder);
896 TEST_P(AutomaticCodecTest, ConstantData) {
897 runSimpleTest(constantDataHolder);
900 TEST_P(AutomaticCodecTest, ValidPrefixes) {
901 const auto prefixes = codec_->validPrefixes();
902 for (const auto& prefix : prefixes) {
903 EXPECT_FALSE(prefix.empty());
904 // Ensure that all strings are at least 8 bytes for LZMA2.
905 // The bytes after the prefix should be ignored by `canUncompress()`.
906 IOBuf data{IOBuf::COPY_BUFFER, prefix, 0, 8};
908 EXPECT_TRUE(codec_->canUncompress(&data));
909 EXPECT_TRUE(auto_->canUncompress(&data));
913 TEST_P(AutomaticCodecTest, NeedsUncompressedLength) {
914 if (codec_->needsUncompressedLength()) {
915 EXPECT_TRUE(auto_->needsUncompressedLength());
919 TEST_P(AutomaticCodecTest, maxUncompressedLength) {
920 EXPECT_LE(codec_->maxUncompressedLength(), auto_->maxUncompressedLength());
923 TEST_P(AutomaticCodecTest, DefaultCodec) {
924 const uint64_t length = 42;
925 std::vector<std::unique_ptr<Codec>> codecs;
926 codecs.push_back(getCodec(CodecType::ZSTD));
927 auto automatic = getAutoUncompressionCodec(std::move(codecs));
928 auto original = IOBuf::wrapBuffer(constantDataHolder.data(length));
929 auto compressed = codec_->compress(original.get());
930 auto decompressed = automatic->uncompress(compressed.get());
932 EXPECT_EQ(constantDataHolder.hash(length), hashIOBuf(decompressed.get()));
936 class CustomCodec : public Codec {
938 static std::unique_ptr<Codec> create(std::string prefix, CodecType type) {
939 return std::make_unique<CustomCodec>(std::move(prefix), type);
941 explicit CustomCodec(std::string prefix, CodecType type)
942 : Codec(CodecType::USER_DEFINED),
943 prefix_(std::move(prefix)),
944 codec_(getCodec(type)) {}
947 std::vector<std::string> validPrefixes() const override {
951 uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override {
952 return codec_->maxCompressedLength(uncompressedLength) + prefix_.size();
955 bool canUncompress(const IOBuf* data, Optional<uint64_t>) const override {
956 auto clone = data->cloneCoalescedAsValue();
957 if (clone.length() < prefix_.size()) {
960 return memcmp(clone.data(), prefix_.data(), prefix_.size()) == 0;
963 std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override {
964 auto result = IOBuf::copyBuffer(prefix_);
965 result->appendChain(codec_->compress(data));
966 EXPECT_TRUE(canUncompress(result.get(), data->computeChainDataLength()));
970 std::unique_ptr<IOBuf> doUncompress(
972 Optional<uint64_t> uncompressedLength) override {
973 EXPECT_TRUE(canUncompress(data, uncompressedLength));
974 auto clone = data->cloneCoalescedAsValue();
975 clone.trimStart(prefix_.size());
976 return codec_->uncompress(&clone, uncompressedLength);
980 std::unique_ptr<Codec> codec_;
984 TEST_P(AutomaticCodecTest, CustomCodec) {
985 const uint64_t length = 42;
986 auto ab = CustomCodec::create("ab", CodecType::ZSTD);
987 std::vector<std::unique_ptr<Codec>> codecs;
988 codecs.push_back(CustomCodec::create("ab", CodecType::ZSTD));
989 auto automatic = getAutoUncompressionCodec(std::move(codecs));
990 auto original = IOBuf::wrapBuffer(constantDataHolder.data(length));
992 auto abCompressed = ab->compress(original.get());
993 auto abDecompressed = automatic->uncompress(abCompressed.get());
994 EXPECT_TRUE(automatic->canUncompress(abCompressed.get()));
995 EXPECT_FALSE(auto_->canUncompress(abCompressed.get()));
996 EXPECT_EQ(constantDataHolder.hash(length), hashIOBuf(abDecompressed.get()));
998 auto compressed = codec_->compress(original.get());
999 auto decompressed = automatic->uncompress(compressed.get());
1000 EXPECT_EQ(constantDataHolder.hash(length), hashIOBuf(decompressed.get()));
1003 TEST_P(AutomaticCodecTest, CustomDefaultCodec) {
1004 const uint64_t length = 42;
1005 auto none = CustomCodec::create("none", CodecType::NO_COMPRESSION);
1006 std::vector<std::unique_ptr<Codec>> codecs;
1007 codecs.push_back(CustomCodec::create("none", CodecType::NO_COMPRESSION));
1008 codecs.push_back(getCodec(CodecType::LZ4_FRAME));
1009 auto automatic = getAutoUncompressionCodec(std::move(codecs));
1010 auto original = IOBuf::wrapBuffer(constantDataHolder.data(length));
1012 auto noneCompressed = none->compress(original.get());
1013 auto noneDecompressed = automatic->uncompress(noneCompressed.get());
1014 EXPECT_TRUE(automatic->canUncompress(noneCompressed.get()));
1015 EXPECT_FALSE(auto_->canUncompress(noneCompressed.get()));
1016 EXPECT_EQ(constantDataHolder.hash(length), hashIOBuf(noneDecompressed.get()));
1018 auto compressed = codec_->compress(original.get());
1019 auto decompressed = automatic->uncompress(compressed.get());
1020 EXPECT_EQ(constantDataHolder.hash(length), hashIOBuf(decompressed.get()));
1023 TEST_P(AutomaticCodecTest, canUncompressOneBytes) {
1024 // No default codec can uncompress 1 bytes.
1025 IOBuf buf{IOBuf::CREATE, 1};
1027 EXPECT_FALSE(codec_->canUncompress(&buf, 1));
1028 EXPECT_FALSE(codec_->canUncompress(&buf, folly::none));
1029 EXPECT_FALSE(auto_->canUncompress(&buf, 1));
1030 EXPECT_FALSE(auto_->canUncompress(&buf, folly::none));
1033 INSTANTIATE_TEST_CASE_P(
1037 CodecType::LZ4_FRAME,
1044 TEST(ValidPrefixesTest, CustomCodec) {
1045 std::vector<std::unique_ptr<Codec>> codecs;
1046 codecs.push_back(CustomCodec::create("none", CodecType::NO_COMPRESSION));
1047 const auto none = getAutoUncompressionCodec(std::move(codecs));
1048 const auto prefixes = none->validPrefixes();
1049 const auto it = std::find(prefixes.begin(), prefixes.end(), "none");
1050 EXPECT_TRUE(it != prefixes.end());
1053 #define EXPECT_THROW_IF_DEBUG(statement, expected_exception) \
1056 EXPECT_THROW((statement), expected_exception); \
1058 EXPECT_NO_THROW((statement)); \
1062 TEST(CheckCompatibleTest, SimplePrefixSecond) {
1063 std::vector<std::unique_ptr<Codec>> codecs;
1064 codecs.push_back(CustomCodec::create("abc", CodecType::NO_COMPRESSION));
1065 codecs.push_back(CustomCodec::create("ab", CodecType::NO_COMPRESSION));
1066 EXPECT_THROW_IF_DEBUG(
1067 getAutoUncompressionCodec(std::move(codecs)), std::invalid_argument);
1070 TEST(CheckCompatibleTest, SimplePrefixFirst) {
1071 std::vector<std::unique_ptr<Codec>> codecs;
1072 codecs.push_back(CustomCodec::create("ab", CodecType::NO_COMPRESSION));
1073 codecs.push_back(CustomCodec::create("abc", CodecType::NO_COMPRESSION));
1074 EXPECT_THROW_IF_DEBUG(
1075 getAutoUncompressionCodec(std::move(codecs)), std::invalid_argument);
1078 TEST(CheckCompatibleTest, Empty) {
1079 std::vector<std::unique_ptr<Codec>> codecs;
1080 codecs.push_back(CustomCodec::create("", CodecType::NO_COMPRESSION));
1081 EXPECT_THROW_IF_DEBUG(
1082 getAutoUncompressionCodec(std::move(codecs)), std::invalid_argument);
1085 TEST(CheckCompatibleTest, ZstdPrefix) {
1086 std::vector<std::unique_ptr<Codec>> codecs;
1087 codecs.push_back(CustomCodec::create("\x28\xB5\x2F", CodecType::ZSTD));
1088 EXPECT_THROW_IF_DEBUG(
1089 getAutoUncompressionCodec(std::move(codecs)), std::invalid_argument);
1092 TEST(CheckCompatibleTest, ZstdDuplicate) {
1093 std::vector<std::unique_ptr<Codec>> codecs;
1094 codecs.push_back(CustomCodec::create("\x28\xB5\x2F\xFD", CodecType::ZSTD));
1095 EXPECT_THROW_IF_DEBUG(
1096 getAutoUncompressionCodec(std::move(codecs)), std::invalid_argument);
1099 TEST(CheckCompatibleTest, ZlibIsPrefix) {
1100 std::vector<std::unique_ptr<Codec>> codecs;
1101 codecs.push_back(CustomCodec::create("\x18\x76zzasdf", CodecType::ZSTD));
1102 EXPECT_THROW_IF_DEBUG(
1103 getAutoUncompressionCodec(std::move(codecs)), std::invalid_argument);
1106 #if FOLLY_HAVE_LIBZSTD
1108 TEST(ZstdTest, BackwardCompatible) {
1109 auto codec = getCodec(CodecType::ZSTD);
1111 auto const data = IOBuf::wrapBuffer(randomDataHolder.data(size_t(1) << 20));
1112 auto compressed = codec->compress(data.get());
1113 compressed->coalesce();
1116 ZSTD_getDecompressedSize(compressed->data(), compressed->length()));
1120 IOBuf::wrapBuffer(randomDataHolder.data(size_t(100) << 20));
1121 auto compressed = codec->compress(data.get());
1122 compressed->coalesce();
1125 ZSTD_getDecompressedSize(compressed->data(), compressed->length()));
1132 int main(int argc, char *argv[]) {
1133 testing::InitGoogleTest(&argc, argv);
1134 gflags::ParseCommandLineFlags(&argc, &argv, true);
1136 auto ret = RUN_ALL_TESTS();
1138 folly::runBenchmarksOnFlag();