Enforce forward progress with StreamCodec
[folly.git] / folly / io / test / CompressionTest.cpp
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/io/Compression.h>
18
19 #include <algorithm>
20 #include <random>
21 #include <set>
22 #include <thread>
23 #include <unordered_map>
24 #include <utility>
25
26 #include <boost/noncopyable.hpp>
27 #include <glog/logging.h>
28
29 #include <folly/Benchmark.h>
30 #include <folly/Hash.h>
31 #include <folly/Memory.h>
32 #include <folly/Random.h>
33 #include <folly/Varint.h>
34 #include <folly/io/IOBufQueue.h>
35 #include <folly/portability/GTest.h>
36
37 #if FOLLY_HAVE_LIBZSTD
38 #include <zstd.h>
39 #endif
40
41 #if FOLLY_HAVE_LIBZ
42 #include <folly/io/compression/Zlib.h>
43 #endif
44
45 namespace zlib = folly::io::zlib;
46
47 namespace folly {
48 namespace io {
49 namespace test {
50
51 class DataHolder : private boost::noncopyable {
52  public:
53   uint64_t hash(size_t size) const;
54   ByteRange data(size_t size) const;
55
56  protected:
57   explicit DataHolder(size_t sizeLog2);
58   const size_t size_;
59   std::unique_ptr<uint8_t[]> data_;
60   mutable std::unordered_map<uint64_t, uint64_t> hashCache_;
61 };
62
63 DataHolder::DataHolder(size_t sizeLog2)
64   : size_(size_t(1) << sizeLog2),
65     data_(new uint8_t[size_]) {
66 }
67
68 uint64_t DataHolder::hash(size_t size) const {
69   CHECK_LE(size, size_);
70   auto p = hashCache_.find(size);
71   if (p != hashCache_.end()) {
72     return p->second;
73   }
74
75   uint64_t h = folly::hash::fnv64_buf(data_.get(), size);
76   hashCache_[size] = h;
77   return h;
78 }
79
80 ByteRange DataHolder::data(size_t size) const {
81   CHECK_LE(size, size_);
82   return ByteRange(data_.get(), size);
83 }
84
85 uint64_t hashIOBuf(const IOBuf* buf) {
86   uint64_t h = folly::hash::FNV_64_HASH_START;
87   for (auto& range : *buf) {
88     h = folly::hash::fnv64_buf(range.data(), range.size(), h);
89   }
90   return h;
91 }
92
93 class RandomDataHolder : public DataHolder {
94  public:
95   explicit RandomDataHolder(size_t sizeLog2);
96 };
97
98 RandomDataHolder::RandomDataHolder(size_t sizeLog2)
99   : DataHolder(sizeLog2) {
100   static constexpr size_t numThreadsLog2 = 3;
101   static constexpr size_t numThreads = size_t(1) << numThreadsLog2;
102
103   uint32_t seed = randomNumberSeed();
104
105   std::vector<std::thread> threads;
106   threads.reserve(numThreads);
107   for (size_t t = 0; t < numThreads; ++t) {
108     threads.emplace_back([this, seed, t, sizeLog2] {
109       std::mt19937 rng(seed + t);
110       size_t countLog2 = sizeLog2 - numThreadsLog2;
111       size_t start = size_t(t) << countLog2;
112       for (size_t i = 0; i < countLog2; ++i) {
113         this->data_[start + i] = rng();
114       }
115     });
116   }
117
118   for (auto& t : threads) {
119     t.join();
120   }
121 }
122
123 class ConstantDataHolder : public DataHolder {
124  public:
125   explicit ConstantDataHolder(size_t sizeLog2);
126 };
127
128 ConstantDataHolder::ConstantDataHolder(size_t sizeLog2)
129   : DataHolder(sizeLog2) {
130   memset(data_.get(), 'a', size_);
131 }
132
133 constexpr size_t dataSizeLog2 = 27;  // 128MiB
134 RandomDataHolder randomDataHolder(dataSizeLog2);
135 ConstantDataHolder constantDataHolder(dataSizeLog2);
136
137 // The intersection of the provided codecs & those that are compiled in.
138 static std::vector<CodecType> supportedCodecs(std::vector<CodecType> const& v) {
139   std::vector<CodecType> supported;
140
141   std::copy_if(
142       std::begin(v),
143       std::end(v),
144       std::back_inserter(supported),
145       hasCodec);
146
147   return supported;
148 }
149
150 // All compiled-in compression codecs.
151 static std::vector<CodecType> availableCodecs() {
152   std::vector<CodecType> codecs;
153
154   for (size_t i = 0; i < static_cast<size_t>(CodecType::NUM_CODEC_TYPES); ++i) {
155     auto type = static_cast<CodecType>(i);
156     if (hasCodec(type)) {
157       codecs.push_back(type);
158     }
159   }
160
161   return codecs;
162 }
163
164 static std::vector<CodecType> availableStreamCodecs() {
165   std::vector<CodecType> codecs;
166
167   for (size_t i = 0; i < static_cast<size_t>(CodecType::NUM_CODEC_TYPES); ++i) {
168     auto type = static_cast<CodecType>(i);
169     if (hasStreamCodec(type)) {
170       codecs.push_back(type);
171     }
172   }
173
174   return codecs;
175 }
176
177 TEST(CompressionTestNeedsUncompressedLength, Simple) {
178   static const struct {
179     CodecType type;
180     bool needsUncompressedLength;
181   } expectations[] = {
182       {CodecType::NO_COMPRESSION, false},
183       {CodecType::LZ4, true},
184       {CodecType::SNAPPY, false},
185       {CodecType::ZLIB, false},
186       {CodecType::LZ4_VARINT_SIZE, false},
187       {CodecType::LZMA2, false},
188       {CodecType::LZMA2_VARINT_SIZE, false},
189       {CodecType::ZSTD, false},
190       {CodecType::GZIP, false},
191       {CodecType::LZ4_FRAME, false},
192       {CodecType::BZIP2, false},
193   };
194
195   for (auto const& test : expectations) {
196     if (hasCodec(test.type)) {
197       EXPECT_EQ(getCodec(test.type)->needsUncompressedLength(),
198                 test.needsUncompressedLength);
199     }
200   }
201 }
202
203 class CompressionTest
204     : public testing::TestWithParam<std::tr1::tuple<int, int, CodecType>> {
205  protected:
206   void SetUp() override {
207     auto tup = GetParam();
208     uncompressedLength_ = uint64_t(1) << std::tr1::get<0>(tup);
209     chunks_ = std::tr1::get<1>(tup);
210     codec_ = getCodec(std::tr1::get<2>(tup));
211   }
212
213   void runSimpleIOBufTest(const DataHolder& dh);
214
215   void runSimpleStringTest(const DataHolder& dh);
216
217  private:
218   std::unique_ptr<IOBuf> split(std::unique_ptr<IOBuf> data) const;
219
220   uint64_t uncompressedLength_;
221   size_t chunks_;
222   std::unique_ptr<Codec> codec_;
223 };
224
225 void CompressionTest::runSimpleIOBufTest(const DataHolder& dh) {
226   const auto original = split(IOBuf::wrapBuffer(dh.data(uncompressedLength_)));
227   const auto compressed = split(codec_->compress(original.get()));
228   if (!codec_->needsUncompressedLength()) {
229     auto uncompressed = codec_->uncompress(compressed.get());
230     EXPECT_EQ(uncompressedLength_, uncompressed->computeChainDataLength());
231     EXPECT_EQ(dh.hash(uncompressedLength_), hashIOBuf(uncompressed.get()));
232   }
233   {
234     auto uncompressed = codec_->uncompress(compressed.get(),
235                                            uncompressedLength_);
236     EXPECT_EQ(uncompressedLength_, uncompressed->computeChainDataLength());
237     EXPECT_EQ(dh.hash(uncompressedLength_), hashIOBuf(uncompressed.get()));
238   }
239 }
240
241 void CompressionTest::runSimpleStringTest(const DataHolder& dh) {
242   const auto original = std::string(
243       reinterpret_cast<const char*>(dh.data(uncompressedLength_).data()),
244       uncompressedLength_);
245   const auto compressed = codec_->compress(original);
246   if (!codec_->needsUncompressedLength()) {
247     auto uncompressed = codec_->uncompress(compressed);
248     EXPECT_EQ(uncompressedLength_, uncompressed.length());
249     EXPECT_EQ(uncompressed, original);
250   }
251   {
252     auto uncompressed = codec_->uncompress(compressed, uncompressedLength_);
253     EXPECT_EQ(uncompressedLength_, uncompressed.length());
254     EXPECT_EQ(uncompressed, original);
255   }
256 }
257
258 // Uniformly split data into (potentially empty) chunks.
259 std::unique_ptr<IOBuf> CompressionTest::split(
260     std::unique_ptr<IOBuf> data) const {
261   if (data->isChained()) {
262     data->coalesce();
263   }
264
265   const size_t size = data->computeChainDataLength();
266
267   std::multiset<size_t> splits;
268   for (size_t i = 1; i < chunks_; ++i) {
269     splits.insert(Random::rand64(size));
270   }
271
272   folly::IOBufQueue result;
273
274   size_t offset = 0;
275   for (size_t split : splits) {
276     result.append(IOBuf::copyBuffer(data->data() + offset, split - offset));
277     offset = split;
278   }
279   result.append(IOBuf::copyBuffer(data->data() + offset, size - offset));
280
281   return result.move();
282 }
283
284 TEST_P(CompressionTest, RandomData) {
285   runSimpleIOBufTest(randomDataHolder);
286 }
287
288 TEST_P(CompressionTest, ConstantData) {
289   runSimpleIOBufTest(constantDataHolder);
290 }
291
292 TEST_P(CompressionTest, RandomDataString) {
293   runSimpleStringTest(randomDataHolder);
294 }
295
296 TEST_P(CompressionTest, ConstantDataString) {
297   runSimpleStringTest(constantDataHolder);
298 }
299
300 INSTANTIATE_TEST_CASE_P(
301     CompressionTest,
302     CompressionTest,
303     testing::Combine(
304         testing::Values(0, 1, 12, 22, 25, 27),
305         testing::Values(1, 2, 3, 8, 65),
306         testing::ValuesIn(availableCodecs())));
307
308 class CompressionVarintTest
309     : public testing::TestWithParam<std::tr1::tuple<int, CodecType>> {
310  protected:
311   void SetUp() override {
312     auto tup = GetParam();
313     uncompressedLength_ = uint64_t(1) << std::tr1::get<0>(tup);
314     codec_ = getCodec(std::tr1::get<1>(tup));
315   }
316
317   void runSimpleTest(const DataHolder& dh);
318
319   uint64_t uncompressedLength_;
320   std::unique_ptr<Codec> codec_;
321 };
322
323 inline uint64_t oneBasedMsbPos(uint64_t number) {
324   uint64_t pos = 0;
325   for (; number > 0; ++pos, number >>= 1) {
326   }
327   return pos;
328 }
329
330 void CompressionVarintTest::runSimpleTest(const DataHolder& dh) {
331   auto original = IOBuf::wrapBuffer(dh.data(uncompressedLength_));
332   auto compressed = codec_->compress(original.get());
333   auto breakPoint =
334       1UL +
335       Random::rand64(
336           std::max(uint64_t(9), oneBasedMsbPos(uncompressedLength_)) / 9UL);
337   auto tinyBuf = IOBuf::copyBuffer(compressed->data(),
338                                    std::min(compressed->length(), breakPoint));
339   compressed->trimStart(breakPoint);
340   tinyBuf->prependChain(std::move(compressed));
341   compressed = std::move(tinyBuf);
342
343   auto uncompressed = codec_->uncompress(compressed.get());
344
345   EXPECT_EQ(uncompressedLength_, uncompressed->computeChainDataLength());
346   EXPECT_EQ(dh.hash(uncompressedLength_), hashIOBuf(uncompressed.get()));
347 }
348
349 TEST_P(CompressionVarintTest, RandomData) {
350   runSimpleTest(randomDataHolder);
351 }
352
353 TEST_P(CompressionVarintTest, ConstantData) {
354   runSimpleTest(constantDataHolder);
355 }
356
357 INSTANTIATE_TEST_CASE_P(
358     CompressionVarintTest,
359     CompressionVarintTest,
360     testing::Combine(
361         testing::Values(0, 1, 12, 22, 25, 27),
362         testing::ValuesIn(supportedCodecs({
363             CodecType::LZ4_VARINT_SIZE,
364             CodecType::LZMA2_VARINT_SIZE,
365         }))));
366
367 TEST(LZMATest, UncompressBadVarint) {
368   if (hasStreamCodec(CodecType::LZMA2_VARINT_SIZE)) {
369     std::string const str(kMaxVarintLength64 * 2, '\xff');
370     ByteRange input((folly::StringPiece(str)));
371     auto codec = getStreamCodec(CodecType::LZMA2_VARINT_SIZE);
372     auto buffer = IOBuf::create(16);
373     buffer->append(buffer->capacity());
374     MutableByteRange output{buffer->writableData(), buffer->length()};
375     EXPECT_THROW(codec->uncompressStream(input, output), std::runtime_error);
376   }
377 }
378
379 class CompressionCorruptionTest : public testing::TestWithParam<CodecType> {
380  protected:
381   void SetUp() override { codec_ = getCodec(GetParam()); }
382
383   void runSimpleTest(const DataHolder& dh);
384
385   std::unique_ptr<Codec> codec_;
386 };
387
388 void CompressionCorruptionTest::runSimpleTest(const DataHolder& dh) {
389   constexpr uint64_t uncompressedLength = 42;
390   auto original = IOBuf::wrapBuffer(dh.data(uncompressedLength));
391   auto compressed = codec_->compress(original.get());
392
393   if (!codec_->needsUncompressedLength()) {
394     auto uncompressed = codec_->uncompress(compressed.get());
395     EXPECT_EQ(uncompressedLength, uncompressed->computeChainDataLength());
396     EXPECT_EQ(dh.hash(uncompressedLength), hashIOBuf(uncompressed.get()));
397   }
398   {
399     auto uncompressed = codec_->uncompress(compressed.get(),
400                                            uncompressedLength);
401     EXPECT_EQ(uncompressedLength, uncompressed->computeChainDataLength());
402     EXPECT_EQ(dh.hash(uncompressedLength), hashIOBuf(uncompressed.get()));
403   }
404
405   EXPECT_THROW(codec_->uncompress(compressed.get(), uncompressedLength + 1),
406                std::runtime_error);
407
408   auto corrupted = compressed->clone();
409   corrupted->unshare();
410   // Truncate the last character
411   corrupted->prev()->trimEnd(1);
412   if (!codec_->needsUncompressedLength()) {
413     EXPECT_THROW(codec_->uncompress(corrupted.get()),
414                  std::runtime_error);
415   }
416
417   EXPECT_THROW(codec_->uncompress(corrupted.get(), uncompressedLength),
418                std::runtime_error);
419
420   corrupted = compressed->clone();
421   corrupted->unshare();
422   // Corrupt the first character
423   ++(corrupted->writableData()[0]);
424
425   if (!codec_->needsUncompressedLength()) {
426     EXPECT_THROW(codec_->uncompress(corrupted.get()),
427                  std::runtime_error);
428   }
429
430   EXPECT_THROW(codec_->uncompress(corrupted.get(), uncompressedLength),
431                std::runtime_error);
432 }
433
434 TEST_P(CompressionCorruptionTest, RandomData) {
435   runSimpleTest(randomDataHolder);
436 }
437
438 TEST_P(CompressionCorruptionTest, ConstantData) {
439   runSimpleTest(constantDataHolder);
440 }
441
442 INSTANTIATE_TEST_CASE_P(
443     CompressionCorruptionTest,
444     CompressionCorruptionTest,
445     testing::ValuesIn(
446         // NO_COMPRESSION can't detect corruption
447         // LZ4 can't detect corruption reliably (sigh)
448         supportedCodecs({
449             CodecType::SNAPPY,
450             CodecType::ZLIB,
451             CodecType::LZMA2,
452             CodecType::ZSTD,
453             CodecType::LZ4_FRAME,
454             CodecType::BZIP2,
455         })));
456
457 class StreamingUnitTest : public testing::TestWithParam<CodecType> {
458  protected:
459   void SetUp() override {
460     codec_ = getStreamCodec(GetParam());
461   }
462
463   std::unique_ptr<StreamCodec> codec_;
464 };
465
466 TEST(StreamingUnitTest, needsDataLength) {
467   static const struct {
468     CodecType type;
469     bool needsDataLength;
470   } expectations[] = {
471       {CodecType::ZLIB, false},
472       {CodecType::GZIP, false},
473       {CodecType::LZMA2, false},
474       {CodecType::LZMA2_VARINT_SIZE, true},
475       {CodecType::ZSTD, false},
476   };
477
478   for (auto const& test : expectations) {
479     if (hasStreamCodec(test.type)) {
480       EXPECT_EQ(
481           getStreamCodec(test.type)->needsDataLength(), test.needsDataLength);
482     }
483   }
484 }
485
486 TEST_P(StreamingUnitTest, maxCompressedLength) {
487   EXPECT_EQ(0, codec_->maxCompressedLength(0));
488   for (uint64_t const length : {1, 10, 100, 1000, 10000, 100000, 1000000}) {
489     EXPECT_GE(codec_->maxCompressedLength(length), length);
490   }
491 }
492
493 TEST_P(StreamingUnitTest, getUncompressedLength) {
494   auto const empty = IOBuf::create(0);
495   EXPECT_EQ(uint64_t(0), codec_->getUncompressedLength(empty.get()));
496   EXPECT_EQ(uint64_t(0), codec_->getUncompressedLength(empty.get(), 0));
497
498   auto const data = IOBuf::wrapBuffer(randomDataHolder.data(100));
499   auto const compressed = codec_->compress(data.get());
500
501   EXPECT_ANY_THROW(codec_->getUncompressedLength(data.get(), 0));
502   if (auto const length = codec_->getUncompressedLength(data.get())) {
503     EXPECT_EQ(100, *length);
504   }
505   EXPECT_EQ(uint64_t(100), codec_->getUncompressedLength(data.get(), 100));
506   // If the uncompressed length is stored in the frame, then make sure it throws
507   // when it is given the wrong length.
508   if (codec_->getUncompressedLength(data.get()) == uint64_t(100)) {
509     EXPECT_ANY_THROW(codec_->getUncompressedLength(data.get(), 200));
510   }
511 }
512
513 TEST_P(StreamingUnitTest, emptyData) {
514   ByteRange input{};
515   auto buffer = IOBuf::create(1);
516   buffer->append(buffer->capacity());
517   MutableByteRange output{};
518
519   // Test compressing empty data in one pass
520   if (!codec_->needsDataLength()) {
521     EXPECT_TRUE(
522         codec_->compressStream(input, output, StreamCodec::FlushOp::END));
523   }
524   codec_->resetStream(0);
525   EXPECT_TRUE(codec_->compressStream(input, output, StreamCodec::FlushOp::END));
526   output = {buffer->writableData(), buffer->length()};
527   EXPECT_TRUE(codec_->compressStream(input, output, StreamCodec::FlushOp::END));
528   EXPECT_EQ(buffer->length(), output.size());
529
530   // Test compressing empty data with multiple calls to compressStream()
531   codec_->resetStream(0);
532   output = {};
533   EXPECT_FALSE(codec_->compressStream(input, output));
534   EXPECT_TRUE(
535       codec_->compressStream(input, output, StreamCodec::FlushOp::FLUSH));
536   EXPECT_TRUE(codec_->compressStream(input, output, StreamCodec::FlushOp::END));
537   codec_->resetStream(0);
538   output = {buffer->writableData(), buffer->length()};
539   EXPECT_FALSE(codec_->compressStream(input, output));
540   EXPECT_TRUE(
541       codec_->compressStream(input, output, StreamCodec::FlushOp::FLUSH));
542   EXPECT_TRUE(codec_->compressStream(input, output, StreamCodec::FlushOp::END));
543   EXPECT_EQ(buffer->length(), output.size());
544
545   // Test uncompressing empty data
546   output = {};
547   codec_->resetStream();
548   EXPECT_TRUE(codec_->uncompressStream(input, output));
549   codec_->resetStream();
550   EXPECT_TRUE(
551       codec_->uncompressStream(input, output, StreamCodec::FlushOp::FLUSH));
552   codec_->resetStream();
553   EXPECT_TRUE(
554       codec_->uncompressStream(input, output, StreamCodec::FlushOp::END));
555   codec_->resetStream(0);
556   EXPECT_TRUE(codec_->uncompressStream(input, output));
557   codec_->resetStream(0);
558   EXPECT_TRUE(
559       codec_->uncompressStream(input, output, StreamCodec::FlushOp::FLUSH));
560   codec_->resetStream(0);
561   EXPECT_TRUE(
562       codec_->uncompressStream(input, output, StreamCodec::FlushOp::END));
563 }
564
565 TEST_P(StreamingUnitTest, noForwardProgress) {
566   auto inBuffer = IOBuf::create(2);
567   inBuffer->writableData()[0] = 'a';
568   inBuffer->writableData()[1] = 'a';
569   inBuffer->append(2);
570   const auto compressed = codec_->compress(inBuffer.get());
571   auto outBuffer = IOBuf::create(codec_->maxCompressedLength(2));
572
573   ByteRange emptyInput;
574   MutableByteRange emptyOutput;
575
576   const std::array<StreamCodec::FlushOp, 3> flushOps = {{
577       StreamCodec::FlushOp::NONE,
578       StreamCodec::FlushOp::FLUSH,
579       StreamCodec::FlushOp::END,
580   }};
581
582   // No progress is not okay twice in a row for all flush operations when
583   // compressing
584   for (const auto flushOp : flushOps) {
585     if (codec_->needsDataLength()) {
586       codec_->resetStream(inBuffer->computeChainDataLength());
587     } else {
588       codec_->resetStream();
589     }
590     auto input = inBuffer->coalesce();
591     MutableByteRange output = {outBuffer->writableTail(),
592                                outBuffer->tailroom()};
593     // Compress some data to avoid empty data special casing
594     while (!input.empty()) {
595       codec_->compressStream(input, output);
596     }
597     EXPECT_FALSE(codec_->compressStream(emptyInput, emptyOutput, flushOp));
598     EXPECT_THROW(
599         codec_->compressStream(emptyInput, emptyOutput, flushOp),
600         std::runtime_error);
601   }
602
603   // No progress is not okay twice in a row for all flush operations when
604   // uncompressing
605   for (const auto flushOp : flushOps) {
606     codec_->resetStream();
607     auto input = compressed->coalesce();
608     // Remove the last byte so the operation is incomplete
609     input.uncheckedSubtract(1);
610     MutableByteRange output = {inBuffer->writableData(), inBuffer->length()};
611     // Uncompress some data to avoid empty data special casing
612     while (!input.empty()) {
613       EXPECT_FALSE(codec_->uncompressStream(input, output));
614     }
615     EXPECT_FALSE(codec_->uncompressStream(emptyInput, emptyOutput, flushOp));
616     EXPECT_THROW(
617         codec_->uncompressStream(emptyInput, emptyOutput, flushOp),
618         std::runtime_error);
619   }
620 }
621
622 TEST_P(StreamingUnitTest, stateTransitions) {
623   auto inBuffer = IOBuf::create(2);
624   inBuffer->writableData()[0] = 'a';
625   inBuffer->writableData()[1] = 'a';
626   inBuffer->append(2);
627   auto compressed = codec_->compress(inBuffer.get());
628   ByteRange const in = compressed->coalesce();
629   auto outBuffer = IOBuf::create(codec_->maxCompressedLength(in.size()));
630   MutableByteRange const out{outBuffer->writableTail(), outBuffer->tailroom()};
631
632   auto compress = [&](
633       StreamCodec::FlushOp flushOp = StreamCodec::FlushOp::NONE,
634       bool empty = false) {
635     auto input = in;
636     auto output = empty ? MutableByteRange{} : out;
637     return codec_->compressStream(input, output, flushOp);
638   };
639   auto compress_all = [&](bool expect,
640                           StreamCodec::FlushOp flushOp =
641                               StreamCodec::FlushOp::NONE,
642                           bool empty = false) {
643     auto input = in;
644     auto output = empty ? MutableByteRange{} : out;
645     while (!input.empty()) {
646       if (expect) {
647         EXPECT_TRUE(codec_->compressStream(input, output, flushOp));
648       } else {
649         EXPECT_FALSE(codec_->compressStream(input, output, flushOp));
650       }
651     }
652   };
653   auto uncompress = [&](
654       StreamCodec::FlushOp flushOp = StreamCodec::FlushOp::NONE,
655       bool empty = false) {
656     auto input = in;
657     auto output = empty ? MutableByteRange{} : out;
658     return codec_->uncompressStream(input, output, flushOp);
659   };
660
661   // compression flow
662   if (!codec_->needsDataLength()) {
663     codec_->resetStream();
664     EXPECT_FALSE(compress());
665     EXPECT_FALSE(compress());
666     EXPECT_TRUE(compress(StreamCodec::FlushOp::FLUSH));
667     EXPECT_FALSE(compress());
668     EXPECT_TRUE(compress(StreamCodec::FlushOp::END));
669   }
670   codec_->resetStream(in.size() * 5);
671   compress_all(false);
672   compress_all(false);
673   compress_all(true, StreamCodec::FlushOp::FLUSH);
674   compress_all(false);
675   compress_all(true, StreamCodec::FlushOp::END);
676
677   // uncompression flow
678   codec_->resetStream();
679   EXPECT_FALSE(uncompress(StreamCodec::FlushOp::NONE, true));
680   codec_->resetStream();
681   EXPECT_FALSE(uncompress(StreamCodec::FlushOp::FLUSH, true));
682   codec_->resetStream();
683   EXPECT_FALSE(uncompress(StreamCodec::FlushOp::NONE, true));
684   codec_->resetStream();
685   EXPECT_FALSE(uncompress(StreamCodec::FlushOp::NONE, true));
686   codec_->resetStream();
687   EXPECT_TRUE(uncompress(StreamCodec::FlushOp::FLUSH));
688   // compress -> uncompress
689   codec_->resetStream(in.size());
690   EXPECT_FALSE(compress());
691   EXPECT_THROW(uncompress(), std::logic_error);
692   // uncompress -> compress
693   codec_->resetStream(inBuffer->computeChainDataLength());
694   EXPECT_TRUE(uncompress(StreamCodec::FlushOp::FLUSH));
695   EXPECT_THROW(compress(), std::logic_error);
696   // end -> compress
697   if (!codec_->needsDataLength()) {
698     codec_->resetStream();
699     EXPECT_FALSE(compress());
700     EXPECT_TRUE(compress(StreamCodec::FlushOp::END));
701     EXPECT_THROW(compress(), std::logic_error);
702   }
703   codec_->resetStream(in.size() * 2);
704   compress_all(false);
705   compress_all(true, StreamCodec::FlushOp::END);
706   EXPECT_THROW(compress(), std::logic_error);
707   // end -> uncompress
708   codec_->resetStream();
709   EXPECT_TRUE(uncompress(StreamCodec::FlushOp::FLUSH));
710   EXPECT_THROW(uncompress(), std::logic_error);
711   // flush -> compress
712   codec_->resetStream(in.size());
713   EXPECT_FALSE(compress(StreamCodec::FlushOp::FLUSH, true));
714   EXPECT_THROW(compress(), std::logic_error);
715   // flush -> end
716   codec_->resetStream(in.size());
717   EXPECT_FALSE(compress(StreamCodec::FlushOp::FLUSH, true));
718   EXPECT_THROW(compress(StreamCodec::FlushOp::END), std::logic_error);
719   // undefined -> compress
720   codec_->compress(inBuffer.get());
721   EXPECT_THROW(compress(), std::logic_error);
722   codec_->uncompress(compressed.get(), inBuffer->computeChainDataLength());
723   EXPECT_THROW(compress(), std::logic_error);
724   // undefined -> undefined
725   codec_->uncompress(compressed.get());
726   codec_->compress(inBuffer.get());
727 }
728
729 INSTANTIATE_TEST_CASE_P(
730     StreamingUnitTest,
731     StreamingUnitTest,
732     testing::ValuesIn(availableStreamCodecs()));
733
734 class StreamingCompressionTest
735     : public testing::TestWithParam<std::tuple<int, int, CodecType>> {
736  protected:
737   void SetUp() override {
738     auto const tup = GetParam();
739     uncompressedLength_ = uint64_t(1) << std::get<0>(tup);
740     chunkSize_ = size_t(1) << std::get<1>(tup);
741     codec_ = getStreamCodec(std::get<2>(tup));
742   }
743
744   void runResetStreamTest(DataHolder const& dh);
745   void runCompressStreamTest(DataHolder const& dh);
746   void runUncompressStreamTest(DataHolder const& dh);
747   void runFlushTest(DataHolder const& dh);
748
749  private:
750   std::vector<ByteRange> split(ByteRange data) const;
751
752   uint64_t uncompressedLength_;
753   size_t chunkSize_;
754   std::unique_ptr<StreamCodec> codec_;
755 };
756
757 std::vector<ByteRange> StreamingCompressionTest::split(ByteRange data) const {
758   size_t const pieces = std::max<size_t>(1, data.size() / chunkSize_);
759   std::vector<ByteRange> result;
760   result.reserve(pieces + 1);
761   while (!data.empty()) {
762     size_t const pieceSize = std::min(data.size(), chunkSize_);
763     result.push_back(data.subpiece(0, pieceSize));
764     data.uncheckedAdvance(pieceSize);
765   }
766   return result;
767 }
768
769 static std::unique_ptr<IOBuf> compressSome(
770     StreamCodec* codec,
771     ByteRange data,
772     uint64_t bufferSize,
773     StreamCodec::FlushOp flush) {
774   bool result;
775   IOBufQueue queue;
776   do {
777     auto buffer = IOBuf::create(bufferSize);
778     buffer->append(buffer->capacity());
779     MutableByteRange output{buffer->writableData(), buffer->length()};
780
781     result = codec->compressStream(data, output, flush);
782     buffer->trimEnd(output.size());
783     queue.append(std::move(buffer));
784
785   } while (!(flush == StreamCodec::FlushOp::NONE && data.empty()) && !result);
786   EXPECT_TRUE(data.empty());
787   return queue.move();
788 }
789
790 static std::pair<bool, std::unique_ptr<IOBuf>> uncompressSome(
791     StreamCodec* codec,
792     ByteRange& data,
793     uint64_t bufferSize,
794     StreamCodec::FlushOp flush) {
795   bool result;
796   IOBufQueue queue;
797   do {
798     auto buffer = IOBuf::create(bufferSize);
799     buffer->append(buffer->capacity());
800     MutableByteRange output{buffer->writableData(), buffer->length()};
801
802     result = codec->uncompressStream(data, output, flush);
803     buffer->trimEnd(output.size());
804     queue.append(std::move(buffer));
805
806   } while (queue.tailroom() == 0 && !result);
807   return std::make_pair(result, queue.move());
808 }
809
810 void StreamingCompressionTest::runResetStreamTest(DataHolder const& dh) {
811   auto const input = dh.data(uncompressedLength_);
812   // Compress some but leave state unclean
813   codec_->resetStream(uncompressedLength_);
814   compressSome(codec_.get(), input, chunkSize_, StreamCodec::FlushOp::NONE);
815   // Reset stream and compress all
816   if (codec_->needsDataLength()) {
817     codec_->resetStream(uncompressedLength_);
818   } else {
819     codec_->resetStream();
820   }
821   auto compressed =
822       compressSome(codec_.get(), input, chunkSize_, StreamCodec::FlushOp::END);
823   auto const uncompressed = codec_->uncompress(compressed.get(), input.size());
824   EXPECT_EQ(dh.hash(uncompressedLength_), hashIOBuf(uncompressed.get()));
825 }
826
827 TEST_P(StreamingCompressionTest, resetStream) {
828   runResetStreamTest(constantDataHolder);
829   runResetStreamTest(randomDataHolder);
830 }
831
832 void StreamingCompressionTest::runCompressStreamTest(
833     const folly::io::test::DataHolder& dh) {
834   auto const inputs = split(dh.data(uncompressedLength_));
835
836   IOBufQueue queue;
837   codec_->resetStream(uncompressedLength_);
838   // Compress many inputs in a row
839   for (auto const input : inputs) {
840     queue.append(compressSome(
841         codec_.get(), input, chunkSize_, StreamCodec::FlushOp::NONE));
842   }
843   // Finish the operation with empty input.
844   ByteRange empty;
845   queue.append(
846       compressSome(codec_.get(), empty, chunkSize_, StreamCodec::FlushOp::END));
847
848   auto const uncompressed = codec_->uncompress(queue.front());
849   EXPECT_EQ(dh.hash(uncompressedLength_), hashIOBuf(uncompressed.get()));
850 }
851
852 TEST_P(StreamingCompressionTest, compressStream) {
853   runCompressStreamTest(constantDataHolder);
854   runCompressStreamTest(randomDataHolder);
855 }
856
857 void StreamingCompressionTest::runUncompressStreamTest(
858     const folly::io::test::DataHolder& dh) {
859   auto const data = IOBuf::wrapBuffer(dh.data(uncompressedLength_));
860   // Concatenate 3 compressed frames in a row
861   auto compressed = codec_->compress(data.get());
862   compressed->prependChain(codec_->compress(data.get()));
863   compressed->prependChain(codec_->compress(data.get()));
864   // Pass all 3 compressed frames in one input buffer
865   auto input = compressed->coalesce();
866   // Uncompress the first frame
867   codec_->resetStream(data->computeChainDataLength());
868   {
869     auto const result = uncompressSome(
870         codec_.get(), input, chunkSize_, StreamCodec::FlushOp::FLUSH);
871     ASSERT_TRUE(result.first);
872     ASSERT_EQ(hashIOBuf(data.get()), hashIOBuf(result.second.get()));
873   }
874   // Uncompress the second frame
875   codec_->resetStream();
876   {
877     auto const result = uncompressSome(
878         codec_.get(), input, chunkSize_, StreamCodec::FlushOp::END);
879     ASSERT_TRUE(result.first);
880     ASSERT_EQ(hashIOBuf(data.get()), hashIOBuf(result.second.get()));
881   }
882   // Uncompress the third frame
883   codec_->resetStream();
884   {
885     auto const result = uncompressSome(
886         codec_.get(), input, chunkSize_, StreamCodec::FlushOp::FLUSH);
887     ASSERT_TRUE(result.first);
888     ASSERT_EQ(hashIOBuf(data.get()), hashIOBuf(result.second.get()));
889   }
890   EXPECT_TRUE(input.empty());
891 }
892
893 TEST_P(StreamingCompressionTest, uncompressStream) {
894   runUncompressStreamTest(constantDataHolder);
895   runUncompressStreamTest(randomDataHolder);
896 }
897
898 void StreamingCompressionTest::runFlushTest(DataHolder const& dh) {
899   auto const inputs = split(dh.data(uncompressedLength_));
900   auto uncodec = getStreamCodec(codec_->type());
901
902   if (codec_->needsDataLength()) {
903     codec_->resetStream(uncompressedLength_);
904   } else {
905     codec_->resetStream();
906   }
907   for (auto input : inputs) {
908     // Compress some data and flush the stream
909     auto compressed = compressSome(
910         codec_.get(), input, chunkSize_, StreamCodec::FlushOp::FLUSH);
911     auto compressedRange = compressed->coalesce();
912     // Uncompress the compressed data
913     auto result = uncompressSome(
914         uncodec.get(),
915         compressedRange,
916         chunkSize_,
917         StreamCodec::FlushOp::FLUSH);
918     // All compressed data should have been consumed
919     EXPECT_TRUE(compressedRange.empty());
920     // The frame isn't complete
921     EXPECT_FALSE(result.first);
922     // The uncompressed data should be exactly the input data
923     EXPECT_EQ(input.size(), result.second->computeChainDataLength());
924     auto const data = IOBuf::wrapBuffer(input);
925     EXPECT_EQ(hashIOBuf(data.get()), hashIOBuf(result.second.get()));
926   }
927 }
928
929 TEST_P(StreamingCompressionTest, testFlush) {
930   runFlushTest(constantDataHolder);
931   runFlushTest(randomDataHolder);
932 }
933
934 INSTANTIATE_TEST_CASE_P(
935     StreamingCompressionTest,
936     StreamingCompressionTest,
937     testing::Combine(
938         testing::Values(0, 1, 12, 22, 27),
939         testing::Values(12, 17, 20),
940         testing::ValuesIn(availableStreamCodecs())));
941
942 class AutomaticCodecTest : public testing::TestWithParam<CodecType> {
943  protected:
944   void SetUp() override {
945     codec_ = getCodec(GetParam());
946     auto_ = getAutoUncompressionCodec();
947   }
948
949   void runSimpleTest(const DataHolder& dh);
950
951   std::unique_ptr<Codec> codec_;
952   std::unique_ptr<Codec> auto_;
953 };
954
955 void AutomaticCodecTest::runSimpleTest(const DataHolder& dh) {
956   constexpr uint64_t uncompressedLength = 1000;
957   auto original = IOBuf::wrapBuffer(dh.data(uncompressedLength));
958   auto compressed = codec_->compress(original.get());
959
960   if (!codec_->needsUncompressedLength()) {
961     auto uncompressed = auto_->uncompress(compressed.get());
962     EXPECT_EQ(uncompressedLength, uncompressed->computeChainDataLength());
963     EXPECT_EQ(dh.hash(uncompressedLength), hashIOBuf(uncompressed.get()));
964   }
965   {
966     auto uncompressed = auto_->uncompress(compressed.get(), uncompressedLength);
967     EXPECT_EQ(uncompressedLength, uncompressed->computeChainDataLength());
968     EXPECT_EQ(dh.hash(uncompressedLength), hashIOBuf(uncompressed.get()));
969   }
970   ASSERT_GE(compressed->computeChainDataLength(), 8);
971   for (size_t i = 0; i < 8; ++i) {
972     auto split = compressed->clone();
973     auto rest = compressed->clone();
974     split->trimEnd(split->length() - i);
975     rest->trimStart(i);
976     split->appendChain(std::move(rest));
977     auto uncompressed = auto_->uncompress(split.get(), uncompressedLength);
978     EXPECT_EQ(uncompressedLength, uncompressed->computeChainDataLength());
979     EXPECT_EQ(dh.hash(uncompressedLength), hashIOBuf(uncompressed.get()));
980   }
981 }
982
983 TEST_P(AutomaticCodecTest, RandomData) {
984   runSimpleTest(randomDataHolder);
985 }
986
987 TEST_P(AutomaticCodecTest, ConstantData) {
988   runSimpleTest(constantDataHolder);
989 }
990
991 TEST_P(AutomaticCodecTest, ValidPrefixes) {
992   const auto prefixes = codec_->validPrefixes();
993   for (const auto& prefix : prefixes) {
994     EXPECT_FALSE(prefix.empty());
995     // Ensure that all strings are at least 8 bytes for LZMA2.
996     // The bytes after the prefix should be ignored by `canUncompress()`.
997     IOBuf data{IOBuf::COPY_BUFFER, prefix, 0, 8};
998     data.append(8);
999     EXPECT_TRUE(codec_->canUncompress(&data));
1000     EXPECT_TRUE(auto_->canUncompress(&data));
1001   }
1002 }
1003
1004 TEST_P(AutomaticCodecTest, NeedsUncompressedLength) {
1005   if (codec_->needsUncompressedLength()) {
1006     EXPECT_TRUE(auto_->needsUncompressedLength());
1007   }
1008 }
1009
1010 TEST_P(AutomaticCodecTest, maxUncompressedLength) {
1011   EXPECT_LE(codec_->maxUncompressedLength(), auto_->maxUncompressedLength());
1012 }
1013
1014 TEST_P(AutomaticCodecTest, DefaultCodec) {
1015   const uint64_t length = 42;
1016   std::vector<std::unique_ptr<Codec>> codecs;
1017   codecs.push_back(getCodec(CodecType::ZSTD));
1018   auto automatic = getAutoUncompressionCodec(std::move(codecs));
1019   auto original = IOBuf::wrapBuffer(constantDataHolder.data(length));
1020   auto compressed = codec_->compress(original.get());
1021   auto decompressed = automatic->uncompress(compressed.get());
1022
1023   EXPECT_EQ(constantDataHolder.hash(length), hashIOBuf(decompressed.get()));
1024 }
1025
1026 namespace {
1027 class CustomCodec : public Codec {
1028  public:
1029   static std::unique_ptr<Codec> create(std::string prefix, CodecType type) {
1030     return std::make_unique<CustomCodec>(std::move(prefix), type);
1031   }
1032   explicit CustomCodec(std::string prefix, CodecType type)
1033       : Codec(CodecType::USER_DEFINED),
1034         prefix_(std::move(prefix)),
1035         codec_(getCodec(type)) {}
1036
1037  private:
1038   std::vector<std::string> validPrefixes() const override {
1039     return {prefix_};
1040   }
1041
1042   uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override {
1043     return codec_->maxCompressedLength(uncompressedLength) + prefix_.size();
1044   }
1045
1046   bool canUncompress(const IOBuf* data, Optional<uint64_t>) const override {
1047     auto clone = data->cloneCoalescedAsValue();
1048     if (clone.length() < prefix_.size()) {
1049       return false;
1050     }
1051     return memcmp(clone.data(), prefix_.data(), prefix_.size()) == 0;
1052   }
1053
1054   std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override {
1055     auto result = IOBuf::copyBuffer(prefix_);
1056     result->appendChain(codec_->compress(data));
1057     EXPECT_TRUE(canUncompress(result.get(), data->computeChainDataLength()));
1058     return result;
1059   }
1060
1061   std::unique_ptr<IOBuf> doUncompress(
1062       const IOBuf* data,
1063       Optional<uint64_t> uncompressedLength) override {
1064     EXPECT_TRUE(canUncompress(data, uncompressedLength));
1065     auto clone = data->cloneCoalescedAsValue();
1066     clone.trimStart(prefix_.size());
1067     return codec_->uncompress(&clone, uncompressedLength);
1068   }
1069
1070   std::string prefix_;
1071   std::unique_ptr<Codec> codec_;
1072 };
1073 }
1074
1075 TEST_P(AutomaticCodecTest, CustomCodec) {
1076   const uint64_t length = 42;
1077   auto ab = CustomCodec::create("ab", CodecType::ZSTD);
1078   std::vector<std::unique_ptr<Codec>> codecs;
1079   codecs.push_back(CustomCodec::create("ab", CodecType::ZSTD));
1080   auto automatic = getAutoUncompressionCodec(std::move(codecs));
1081   auto original = IOBuf::wrapBuffer(constantDataHolder.data(length));
1082
1083   auto abCompressed = ab->compress(original.get());
1084   auto abDecompressed = automatic->uncompress(abCompressed.get());
1085   EXPECT_TRUE(automatic->canUncompress(abCompressed.get()));
1086   EXPECT_FALSE(auto_->canUncompress(abCompressed.get()));
1087   EXPECT_EQ(constantDataHolder.hash(length), hashIOBuf(abDecompressed.get()));
1088
1089   auto compressed = codec_->compress(original.get());
1090   auto decompressed = automatic->uncompress(compressed.get());
1091   EXPECT_EQ(constantDataHolder.hash(length), hashIOBuf(decompressed.get()));
1092 }
1093
1094 TEST_P(AutomaticCodecTest, CustomDefaultCodec) {
1095   const uint64_t length = 42;
1096   auto none = CustomCodec::create("none", CodecType::NO_COMPRESSION);
1097   std::vector<std::unique_ptr<Codec>> codecs;
1098   codecs.push_back(CustomCodec::create("none", CodecType::NO_COMPRESSION));
1099   codecs.push_back(getCodec(CodecType::LZ4_FRAME));
1100   auto automatic = getAutoUncompressionCodec(std::move(codecs));
1101   auto original = IOBuf::wrapBuffer(constantDataHolder.data(length));
1102
1103   auto noneCompressed = none->compress(original.get());
1104   auto noneDecompressed = automatic->uncompress(noneCompressed.get());
1105   EXPECT_TRUE(automatic->canUncompress(noneCompressed.get()));
1106   EXPECT_FALSE(auto_->canUncompress(noneCompressed.get()));
1107   EXPECT_EQ(constantDataHolder.hash(length), hashIOBuf(noneDecompressed.get()));
1108
1109   auto compressed = codec_->compress(original.get());
1110   auto decompressed = automatic->uncompress(compressed.get());
1111   EXPECT_EQ(constantDataHolder.hash(length), hashIOBuf(decompressed.get()));
1112 }
1113
1114 TEST_P(AutomaticCodecTest, canUncompressOneBytes) {
1115   // No default codec can uncompress 1 bytes.
1116   IOBuf buf{IOBuf::CREATE, 1};
1117   buf.append(1);
1118   EXPECT_FALSE(codec_->canUncompress(&buf, 1));
1119   EXPECT_FALSE(codec_->canUncompress(&buf, folly::none));
1120   EXPECT_FALSE(auto_->canUncompress(&buf, 1));
1121   EXPECT_FALSE(auto_->canUncompress(&buf, folly::none));
1122 }
1123
1124 INSTANTIATE_TEST_CASE_P(
1125     AutomaticCodecTest,
1126     AutomaticCodecTest,
1127     testing::Values(
1128         CodecType::LZ4_FRAME,
1129         CodecType::ZSTD,
1130         CodecType::ZLIB,
1131         CodecType::GZIP,
1132         CodecType::LZMA2,
1133         CodecType::BZIP2));
1134
1135 TEST(ValidPrefixesTest, CustomCodec) {
1136   std::vector<std::unique_ptr<Codec>> codecs;
1137   codecs.push_back(CustomCodec::create("none", CodecType::NO_COMPRESSION));
1138   const auto none = getAutoUncompressionCodec(std::move(codecs));
1139   const auto prefixes = none->validPrefixes();
1140   const auto it = std::find(prefixes.begin(), prefixes.end(), "none");
1141   EXPECT_TRUE(it != prefixes.end());
1142 }
1143
1144 #define EXPECT_THROW_IF_DEBUG(statement, expected_exception) \
1145   do {                                                       \
1146     if (kIsDebug) {                                          \
1147       EXPECT_THROW((statement), expected_exception);         \
1148     } else {                                                 \
1149       EXPECT_NO_THROW((statement));                          \
1150     }                                                        \
1151   } while (false)
1152
1153 TEST(CheckCompatibleTest, SimplePrefixSecond) {
1154   std::vector<std::unique_ptr<Codec>> codecs;
1155   codecs.push_back(CustomCodec::create("abc", CodecType::NO_COMPRESSION));
1156   codecs.push_back(CustomCodec::create("ab", CodecType::NO_COMPRESSION));
1157   EXPECT_THROW_IF_DEBUG(
1158       getAutoUncompressionCodec(std::move(codecs)), std::invalid_argument);
1159 }
1160
1161 TEST(CheckCompatibleTest, SimplePrefixFirst) {
1162   std::vector<std::unique_ptr<Codec>> codecs;
1163   codecs.push_back(CustomCodec::create("ab", CodecType::NO_COMPRESSION));
1164   codecs.push_back(CustomCodec::create("abc", CodecType::NO_COMPRESSION));
1165   EXPECT_THROW_IF_DEBUG(
1166       getAutoUncompressionCodec(std::move(codecs)), std::invalid_argument);
1167 }
1168
1169 TEST(CheckCompatibleTest, Empty) {
1170   std::vector<std::unique_ptr<Codec>> codecs;
1171   codecs.push_back(CustomCodec::create("", CodecType::NO_COMPRESSION));
1172   EXPECT_THROW_IF_DEBUG(
1173       getAutoUncompressionCodec(std::move(codecs)), std::invalid_argument);
1174 }
1175
1176 TEST(CheckCompatibleTest, ZstdPrefix) {
1177   std::vector<std::unique_ptr<Codec>> codecs;
1178   codecs.push_back(CustomCodec::create("\x28\xB5\x2F", CodecType::ZSTD));
1179   EXPECT_THROW_IF_DEBUG(
1180       getAutoUncompressionCodec(std::move(codecs)), std::invalid_argument);
1181 }
1182
1183 TEST(CheckCompatibleTest, ZstdDuplicate) {
1184   std::vector<std::unique_ptr<Codec>> codecs;
1185   codecs.push_back(CustomCodec::create("\x28\xB5\x2F\xFD", CodecType::ZSTD));
1186   EXPECT_THROW_IF_DEBUG(
1187       getAutoUncompressionCodec(std::move(codecs)), std::invalid_argument);
1188 }
1189
1190 TEST(CheckCompatibleTest, ZlibIsPrefix) {
1191   std::vector<std::unique_ptr<Codec>> codecs;
1192   codecs.push_back(CustomCodec::create("\x18\x76zzasdf", CodecType::ZSTD));
1193   EXPECT_THROW_IF_DEBUG(
1194       getAutoUncompressionCodec(std::move(codecs)), std::invalid_argument);
1195 }
1196
1197 #if FOLLY_HAVE_LIBZSTD
1198
1199 TEST(ZstdTest, BackwardCompatible) {
1200   auto codec = getCodec(CodecType::ZSTD);
1201   {
1202     auto const data = IOBuf::wrapBuffer(randomDataHolder.data(size_t(1) << 20));
1203     auto compressed = codec->compress(data.get());
1204     compressed->coalesce();
1205     EXPECT_EQ(
1206         data->length(),
1207         ZSTD_getDecompressedSize(compressed->data(), compressed->length()));
1208   }
1209   {
1210     auto const data =
1211         IOBuf::wrapBuffer(randomDataHolder.data(size_t(100) << 20));
1212     auto compressed = codec->compress(data.get());
1213     compressed->coalesce();
1214     EXPECT_EQ(
1215         data->length(),
1216         ZSTD_getDecompressedSize(compressed->data(), compressed->length()));
1217   }
1218 }
1219
1220 #endif
1221
1222 #if FOLLY_HAVE_LIBZ
1223
1224 using ZlibFormat = zlib::Options::Format;
1225
1226 TEST(ZlibTest, Auto) {
1227   size_t const uncompressedLength_ = (size_t)1 << 15;
1228   auto const original = std::string(
1229       reinterpret_cast<const char*>(
1230           randomDataHolder.data(uncompressedLength_).data()),
1231       uncompressedLength_);
1232   auto optionCodec = zlib::getCodec(zlib::Options(ZlibFormat::AUTO));
1233
1234   // Test the codec can uncompress zlib data.
1235   {
1236     auto codec = getCodec(CodecType::ZLIB);
1237     auto const compressed = codec->compress(original);
1238     auto const uncompressed = optionCodec->uncompress(compressed);
1239     EXPECT_EQ(original, uncompressed);
1240   }
1241
1242   // Test the codec can uncompress gzip data.
1243   {
1244     auto codec = getCodec(CodecType::GZIP);
1245     auto const compressed = codec->compress(original);
1246     auto const uncompressed = optionCodec->uncompress(compressed);
1247     EXPECT_EQ(original, uncompressed);
1248   }
1249 }
1250
1251 TEST(ZlibTest, DefaultOptions) {
1252   size_t const uncompressedLength_ = (size_t)1 << 20;
1253   auto const original = std::string(
1254       reinterpret_cast<const char*>(
1255           randomDataHolder.data(uncompressedLength_).data()),
1256       uncompressedLength_);
1257   {
1258     auto codec = getCodec(CodecType::ZLIB);
1259     auto optionCodec = zlib::getCodec(zlib::defaultZlibOptions());
1260     auto const compressed = optionCodec->compress(original);
1261     auto uncompressed = codec->uncompress(compressed);
1262     EXPECT_EQ(original, uncompressed);
1263     uncompressed = optionCodec->uncompress(compressed);
1264     EXPECT_EQ(original, uncompressed);
1265   }
1266
1267   {
1268     auto codec = getCodec(CodecType::GZIP);
1269     auto optionCodec = zlib::getCodec(zlib::defaultGzipOptions());
1270     auto const compressed = optionCodec->compress(original);
1271     auto uncompressed = codec->uncompress(compressed);
1272     EXPECT_EQ(original, uncompressed);
1273     uncompressed = optionCodec->uncompress(compressed);
1274     EXPECT_EQ(original, uncompressed);
1275   }
1276 }
1277
1278 class ZlibOptionsTest : public testing::TestWithParam<
1279                             std::tr1::tuple<ZlibFormat, int, int, int>> {
1280  protected:
1281   void SetUp() override {
1282     auto tup = GetParam();
1283     options_.format = std::tr1::get<0>(tup);
1284     options_.windowSize = std::tr1::get<1>(tup);
1285     options_.memLevel = std::tr1::get<2>(tup);
1286     options_.strategy = std::tr1::get<3>(tup);
1287     codec_ = zlib::getStreamCodec(options_);
1288   }
1289
1290   void runSimpleRoundTripTest(const DataHolder& dh);
1291
1292  private:
1293   zlib::Options options_;
1294   std::unique_ptr<StreamCodec> codec_;
1295 };
1296
1297 void ZlibOptionsTest::runSimpleRoundTripTest(const DataHolder& dh) {
1298   size_t const uncompressedLength = (size_t)1 << 16;
1299   auto const original = std::string(
1300       reinterpret_cast<const char*>(dh.data(uncompressedLength).data()),
1301       uncompressedLength);
1302
1303   auto const compressed = codec_->compress(original);
1304   auto const uncompressed = codec_->uncompress(compressed);
1305   EXPECT_EQ(uncompressed, original);
1306 }
1307
1308 TEST_P(ZlibOptionsTest, simpleRoundTripTest) {
1309   runSimpleRoundTripTest(constantDataHolder);
1310   runSimpleRoundTripTest(randomDataHolder);
1311 }
1312
1313 INSTANTIATE_TEST_CASE_P(
1314     ZlibOptionsTest,
1315     ZlibOptionsTest,
1316     testing::Combine(
1317         testing::Values(
1318             ZlibFormat::ZLIB,
1319             ZlibFormat::GZIP,
1320             ZlibFormat::RAW,
1321             ZlibFormat::AUTO),
1322         testing::Values(9, 12, 15),
1323         testing::Values(1, 8, 9),
1324         testing::Values(
1325             Z_DEFAULT_STRATEGY,
1326             Z_FILTERED,
1327             Z_HUFFMAN_ONLY,
1328             Z_RLE,
1329             Z_FIXED)));
1330
1331 #endif // FOLLY_HAVE_LIBZ
1332
1333 } // namespace test
1334 } // namespace io
1335 } // namespace folly
1336
1337 int main(int argc, char *argv[]) {
1338   testing::InitGoogleTest(&argc, argv);
1339   gflags::ParseCommandLineFlags(&argc, &argv, true);
1340
1341   auto ret = RUN_ALL_TESTS();
1342   if (!ret) {
1343     folly::runBenchmarksOnFlag();
1344   }
1345   return ret;
1346 }