Use Baton (again) in EventBase::runInEventBaseThreadAndWait
[folly.git] / folly / io / test / CompressionTest.cpp
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/io/Compression.h>
18
19 #include <algorithm>
20 #include <random>
21 #include <set>
22 #include <thread>
23 #include <unordered_map>
24 #include <utility>
25
26 #include <boost/noncopyable.hpp>
27 #include <glog/logging.h>
28
29 #include <folly/Benchmark.h>
30 #include <folly/Hash.h>
31 #include <folly/Memory.h>
32 #include <folly/Random.h>
33 #include <folly/Varint.h>
34 #include <folly/io/IOBufQueue.h>
35 #include <folly/portability/GTest.h>
36
37 #if FOLLY_HAVE_LIBZSTD
38 #include <zstd.h>
39 #endif
40
41 namespace folly {
42 namespace io {
43 namespace test {
44
45 class DataHolder : private boost::noncopyable {
46  public:
47   uint64_t hash(size_t size) const;
48   ByteRange data(size_t size) const;
49
50  protected:
51   explicit DataHolder(size_t sizeLog2);
52   const size_t size_;
53   std::unique_ptr<uint8_t[]> data_;
54   mutable std::unordered_map<uint64_t, uint64_t> hashCache_;
55 };
56
57 DataHolder::DataHolder(size_t sizeLog2)
58   : size_(size_t(1) << sizeLog2),
59     data_(new uint8_t[size_]) {
60 }
61
62 uint64_t DataHolder::hash(size_t size) const {
63   CHECK_LE(size, size_);
64   auto p = hashCache_.find(size);
65   if (p != hashCache_.end()) {
66     return p->second;
67   }
68
69   uint64_t h = folly::hash::fnv64_buf(data_.get(), size);
70   hashCache_[size] = h;
71   return h;
72 }
73
74 ByteRange DataHolder::data(size_t size) const {
75   CHECK_LE(size, size_);
76   return ByteRange(data_.get(), size);
77 }
78
79 uint64_t hashIOBuf(const IOBuf* buf) {
80   uint64_t h = folly::hash::FNV_64_HASH_START;
81   for (auto& range : *buf) {
82     h = folly::hash::fnv64_buf(range.data(), range.size(), h);
83   }
84   return h;
85 }
86
87 class RandomDataHolder : public DataHolder {
88  public:
89   explicit RandomDataHolder(size_t sizeLog2);
90 };
91
92 RandomDataHolder::RandomDataHolder(size_t sizeLog2)
93   : DataHolder(sizeLog2) {
94   static constexpr size_t numThreadsLog2 = 3;
95   static constexpr size_t numThreads = size_t(1) << numThreadsLog2;
96
97   uint32_t seed = randomNumberSeed();
98
99   std::vector<std::thread> threads;
100   threads.reserve(numThreads);
101   for (size_t t = 0; t < numThreads; ++t) {
102     threads.emplace_back([this, seed, t, sizeLog2] {
103       std::mt19937 rng(seed + t);
104       size_t countLog2 = sizeLog2 - numThreadsLog2;
105       size_t start = size_t(t) << countLog2;
106       for (size_t i = 0; i < countLog2; ++i) {
107         this->data_[start + i] = rng();
108       }
109     });
110   }
111
112   for (auto& t : threads) {
113     t.join();
114   }
115 }
116
117 class ConstantDataHolder : public DataHolder {
118  public:
119   explicit ConstantDataHolder(size_t sizeLog2);
120 };
121
122 ConstantDataHolder::ConstantDataHolder(size_t sizeLog2)
123   : DataHolder(sizeLog2) {
124   memset(data_.get(), 'a', size_);
125 }
126
127 constexpr size_t dataSizeLog2 = 27;  // 128MiB
128 RandomDataHolder randomDataHolder(dataSizeLog2);
129 ConstantDataHolder constantDataHolder(dataSizeLog2);
130
131 // The intersection of the provided codecs & those that are compiled in.
132 static std::vector<CodecType> supportedCodecs(std::vector<CodecType> const& v) {
133   std::vector<CodecType> supported;
134
135   std::copy_if(
136       std::begin(v),
137       std::end(v),
138       std::back_inserter(supported),
139       hasCodec);
140
141   return supported;
142 }
143
144 // All compiled-in compression codecs.
145 static std::vector<CodecType> availableCodecs() {
146   std::vector<CodecType> codecs;
147
148   for (size_t i = 0; i < static_cast<size_t>(CodecType::NUM_CODEC_TYPES); ++i) {
149     auto type = static_cast<CodecType>(i);
150     if (hasCodec(type)) {
151       codecs.push_back(type);
152     }
153   }
154
155   return codecs;
156 }
157
158 static std::vector<CodecType> availableStreamCodecs() {
159   std::vector<CodecType> codecs;
160
161   for (size_t i = 0; i < static_cast<size_t>(CodecType::NUM_CODEC_TYPES); ++i) {
162     auto type = static_cast<CodecType>(i);
163     if (hasStreamCodec(type)) {
164       codecs.push_back(type);
165     }
166   }
167
168   return codecs;
169 }
170
171 TEST(CompressionTestNeedsUncompressedLength, Simple) {
172   static const struct { CodecType type; bool needsUncompressedLength; }
173     expectations[] = {
174       { CodecType::NO_COMPRESSION, false },
175       { CodecType::LZ4, true },
176       { CodecType::SNAPPY, false },
177       { CodecType::ZLIB, false },
178       { CodecType::LZ4_VARINT_SIZE, false },
179       { CodecType::LZMA2, false },
180       { CodecType::LZMA2_VARINT_SIZE, false },
181       { CodecType::ZSTD, false },
182       { CodecType::GZIP, false },
183       { CodecType::LZ4_FRAME, false },
184       { CodecType::BZIP2, false },
185     };
186
187   for (auto const& test : expectations) {
188     if (hasCodec(test.type)) {
189       EXPECT_EQ(getCodec(test.type)->needsUncompressedLength(),
190                 test.needsUncompressedLength);
191     }
192   }
193 }
194
195 class CompressionTest
196     : public testing::TestWithParam<std::tr1::tuple<int, int, CodecType>> {
197  protected:
198   void SetUp() override {
199     auto tup = GetParam();
200     uncompressedLength_ = uint64_t(1) << std::tr1::get<0>(tup);
201     chunks_ = std::tr1::get<1>(tup);
202     codec_ = getCodec(std::tr1::get<2>(tup));
203   }
204
205   void runSimpleIOBufTest(const DataHolder& dh);
206
207   void runSimpleStringTest(const DataHolder& dh);
208
209  private:
210   std::unique_ptr<IOBuf> split(std::unique_ptr<IOBuf> data) const;
211
212   uint64_t uncompressedLength_;
213   size_t chunks_;
214   std::unique_ptr<Codec> codec_;
215 };
216
217 void CompressionTest::runSimpleIOBufTest(const DataHolder& dh) {
218   const auto original = split(IOBuf::wrapBuffer(dh.data(uncompressedLength_)));
219   const auto compressed = split(codec_->compress(original.get()));
220   if (!codec_->needsUncompressedLength()) {
221     auto uncompressed = codec_->uncompress(compressed.get());
222     EXPECT_EQ(uncompressedLength_, uncompressed->computeChainDataLength());
223     EXPECT_EQ(dh.hash(uncompressedLength_), hashIOBuf(uncompressed.get()));
224   }
225   {
226     auto uncompressed = codec_->uncompress(compressed.get(),
227                                            uncompressedLength_);
228     EXPECT_EQ(uncompressedLength_, uncompressed->computeChainDataLength());
229     EXPECT_EQ(dh.hash(uncompressedLength_), hashIOBuf(uncompressed.get()));
230   }
231 }
232
233 void CompressionTest::runSimpleStringTest(const DataHolder& dh) {
234   const auto original = std::string(
235       reinterpret_cast<const char*>(dh.data(uncompressedLength_).data()),
236       uncompressedLength_);
237   const auto compressed = codec_->compress(original);
238   if (!codec_->needsUncompressedLength()) {
239     auto uncompressed = codec_->uncompress(compressed);
240     EXPECT_EQ(uncompressedLength_, uncompressed.length());
241     EXPECT_EQ(uncompressed, original);
242   }
243   {
244     auto uncompressed = codec_->uncompress(compressed, uncompressedLength_);
245     EXPECT_EQ(uncompressedLength_, uncompressed.length());
246     EXPECT_EQ(uncompressed, original);
247   }
248 }
249
250 // Uniformly split data into (potentially empty) chunks.
251 std::unique_ptr<IOBuf> CompressionTest::split(
252     std::unique_ptr<IOBuf> data) const {
253   if (data->isChained()) {
254     data->coalesce();
255   }
256
257   const size_t size = data->computeChainDataLength();
258
259   std::multiset<size_t> splits;
260   for (size_t i = 1; i < chunks_; ++i) {
261     splits.insert(Random::rand64(size));
262   }
263
264   folly::IOBufQueue result;
265
266   size_t offset = 0;
267   for (size_t split : splits) {
268     result.append(IOBuf::copyBuffer(data->data() + offset, split - offset));
269     offset = split;
270   }
271   result.append(IOBuf::copyBuffer(data->data() + offset, size - offset));
272
273   return result.move();
274 }
275
276 TEST_P(CompressionTest, RandomData) {
277   runSimpleIOBufTest(randomDataHolder);
278 }
279
280 TEST_P(CompressionTest, ConstantData) {
281   runSimpleIOBufTest(constantDataHolder);
282 }
283
284 TEST_P(CompressionTest, RandomDataString) {
285   runSimpleStringTest(randomDataHolder);
286 }
287
288 TEST_P(CompressionTest, ConstantDataString) {
289   runSimpleStringTest(constantDataHolder);
290 }
291
292 INSTANTIATE_TEST_CASE_P(
293     CompressionTest,
294     CompressionTest,
295     testing::Combine(
296         testing::Values(0, 1, 12, 22, 25, 27),
297         testing::Values(1, 2, 3, 8, 65),
298         testing::ValuesIn(availableCodecs())));
299
300 class CompressionVarintTest
301     : public testing::TestWithParam<std::tr1::tuple<int, CodecType>> {
302  protected:
303   void SetUp() override {
304     auto tup = GetParam();
305     uncompressedLength_ = uint64_t(1) << std::tr1::get<0>(tup);
306     codec_ = getCodec(std::tr1::get<1>(tup));
307   }
308
309   void runSimpleTest(const DataHolder& dh);
310
311   uint64_t uncompressedLength_;
312   std::unique_ptr<Codec> codec_;
313 };
314
315 inline uint64_t oneBasedMsbPos(uint64_t number) {
316   uint64_t pos = 0;
317   for (; number > 0; ++pos, number >>= 1) {
318   }
319   return pos;
320 }
321
322 void CompressionVarintTest::runSimpleTest(const DataHolder& dh) {
323   auto original = IOBuf::wrapBuffer(dh.data(uncompressedLength_));
324   auto compressed = codec_->compress(original.get());
325   auto breakPoint =
326       1UL +
327       Random::rand64(
328           std::max(uint64_t(9), oneBasedMsbPos(uncompressedLength_)) / 9UL);
329   auto tinyBuf = IOBuf::copyBuffer(compressed->data(),
330                                    std::min(compressed->length(), breakPoint));
331   compressed->trimStart(breakPoint);
332   tinyBuf->prependChain(std::move(compressed));
333   compressed = std::move(tinyBuf);
334
335   auto uncompressed = codec_->uncompress(compressed.get());
336
337   EXPECT_EQ(uncompressedLength_, uncompressed->computeChainDataLength());
338   EXPECT_EQ(dh.hash(uncompressedLength_), hashIOBuf(uncompressed.get()));
339 }
340
341 TEST_P(CompressionVarintTest, RandomData) {
342   runSimpleTest(randomDataHolder);
343 }
344
345 TEST_P(CompressionVarintTest, ConstantData) {
346   runSimpleTest(constantDataHolder);
347 }
348
349 INSTANTIATE_TEST_CASE_P(
350     CompressionVarintTest,
351     CompressionVarintTest,
352     testing::Combine(
353         testing::Values(0, 1, 12, 22, 25, 27),
354         testing::ValuesIn(supportedCodecs({
355             CodecType::LZ4_VARINT_SIZE,
356             CodecType::LZMA2_VARINT_SIZE,
357             }))));
358
359 class CompressionCorruptionTest : public testing::TestWithParam<CodecType> {
360  protected:
361   void SetUp() override { codec_ = getCodec(GetParam()); }
362
363   void runSimpleTest(const DataHolder& dh);
364
365   std::unique_ptr<Codec> codec_;
366 };
367
368 void CompressionCorruptionTest::runSimpleTest(const DataHolder& dh) {
369   constexpr uint64_t uncompressedLength = 42;
370   auto original = IOBuf::wrapBuffer(dh.data(uncompressedLength));
371   auto compressed = codec_->compress(original.get());
372
373   if (!codec_->needsUncompressedLength()) {
374     auto uncompressed = codec_->uncompress(compressed.get());
375     EXPECT_EQ(uncompressedLength, uncompressed->computeChainDataLength());
376     EXPECT_EQ(dh.hash(uncompressedLength), hashIOBuf(uncompressed.get()));
377   }
378   {
379     auto uncompressed = codec_->uncompress(compressed.get(),
380                                            uncompressedLength);
381     EXPECT_EQ(uncompressedLength, uncompressed->computeChainDataLength());
382     EXPECT_EQ(dh.hash(uncompressedLength), hashIOBuf(uncompressed.get()));
383   }
384
385   EXPECT_THROW(codec_->uncompress(compressed.get(), uncompressedLength + 1),
386                std::runtime_error);
387
388   auto corrupted = compressed->clone();
389   corrupted->unshare();
390   // Truncate the last character
391   corrupted->prev()->trimEnd(1);
392   if (!codec_->needsUncompressedLength()) {
393     EXPECT_THROW(codec_->uncompress(corrupted.get()),
394                  std::runtime_error);
395   }
396
397   EXPECT_THROW(codec_->uncompress(corrupted.get(), uncompressedLength),
398                std::runtime_error);
399
400   corrupted = compressed->clone();
401   corrupted->unshare();
402   // Corrupt the first character
403   ++(corrupted->writableData()[0]);
404
405   if (!codec_->needsUncompressedLength()) {
406     EXPECT_THROW(codec_->uncompress(corrupted.get()),
407                  std::runtime_error);
408   }
409
410   EXPECT_THROW(codec_->uncompress(corrupted.get(), uncompressedLength),
411                std::runtime_error);
412 }
413
414 TEST_P(CompressionCorruptionTest, RandomData) {
415   runSimpleTest(randomDataHolder);
416 }
417
418 TEST_P(CompressionCorruptionTest, ConstantData) {
419   runSimpleTest(constantDataHolder);
420 }
421
422 INSTANTIATE_TEST_CASE_P(
423     CompressionCorruptionTest,
424     CompressionCorruptionTest,
425     testing::ValuesIn(
426         // NO_COMPRESSION can't detect corruption
427         // LZ4 can't detect corruption reliably (sigh)
428         supportedCodecs({
429             CodecType::SNAPPY,
430             CodecType::ZLIB,
431             CodecType::LZMA2,
432             CodecType::ZSTD,
433             CodecType::LZ4_FRAME,
434             CodecType::BZIP2,
435         })));
436
437 class StreamingUnitTest : public testing::TestWithParam<CodecType> {
438  protected:
439   void SetUp() override {
440     codec_ = getStreamCodec(GetParam());
441   }
442
443   std::unique_ptr<StreamCodec> codec_;
444 };
445
446 TEST_P(StreamingUnitTest, maxCompressedLength) {
447   EXPECT_EQ(0, codec_->maxCompressedLength(0));
448   for (uint64_t const length : {1, 10, 100, 1000, 10000, 100000, 1000000}) {
449     EXPECT_GE(codec_->maxCompressedLength(length), length);
450   }
451 }
452
453 TEST_P(StreamingUnitTest, getUncompressedLength) {
454   auto const empty = IOBuf::create(0);
455   EXPECT_EQ(uint64_t(0), codec_->getUncompressedLength(empty.get()));
456   EXPECT_EQ(uint64_t(0), codec_->getUncompressedLength(empty.get(), 0));
457
458   auto const data = IOBuf::wrapBuffer(randomDataHolder.data(100));
459   auto const compressed = codec_->compress(data.get());
460
461   EXPECT_ANY_THROW(codec_->getUncompressedLength(data.get(), 0));
462   if (auto const length = codec_->getUncompressedLength(data.get())) {
463     EXPECT_EQ(100, *length);
464   }
465   EXPECT_EQ(uint64_t(100), codec_->getUncompressedLength(data.get(), 100));
466   // If the uncompressed length is stored in the frame, then make sure it throws
467   // when it is given the wrong length.
468   if (codec_->getUncompressedLength(data.get()) == uint64_t(100)) {
469     EXPECT_ANY_THROW(codec_->getUncompressedLength(data.get(), 200));
470   }
471 }
472
473 TEST_P(StreamingUnitTest, emptyData) {
474   ByteRange input{};
475   auto buffer = IOBuf::create(1);
476   buffer->append(buffer->capacity());
477   MutableByteRange output{};
478
479   // Test compressing empty data in one pass
480   EXPECT_TRUE(codec_->compressStream(input, output, StreamCodec::FlushOp::END));
481   codec_->resetStream(0);
482   EXPECT_TRUE(codec_->compressStream(input, output, StreamCodec::FlushOp::END));
483   codec_->resetStream();
484   output = {buffer->writableData(), buffer->length()};
485   EXPECT_TRUE(codec_->compressStream(input, output, StreamCodec::FlushOp::END));
486   EXPECT_EQ(buffer->length(), output.size());
487
488   // Test compressing empty data with multiple calls to compressStream()
489   codec_->resetStream();
490   output = {};
491   EXPECT_FALSE(codec_->compressStream(input, output));
492   EXPECT_TRUE(
493       codec_->compressStream(input, output, StreamCodec::FlushOp::FLUSH));
494   EXPECT_TRUE(codec_->compressStream(input, output, StreamCodec::FlushOp::END));
495   codec_->resetStream();
496   output = {buffer->writableData(), buffer->length()};
497   EXPECT_FALSE(codec_->compressStream(input, output));
498   EXPECT_TRUE(
499       codec_->compressStream(input, output, StreamCodec::FlushOp::FLUSH));
500   EXPECT_TRUE(codec_->compressStream(input, output, StreamCodec::FlushOp::END));
501   EXPECT_EQ(buffer->length(), output.size());
502
503   // Test uncompressing empty data
504   output = {};
505   codec_->resetStream();
506   EXPECT_TRUE(codec_->uncompressStream(input, output));
507   codec_->resetStream();
508   EXPECT_TRUE(
509       codec_->uncompressStream(input, output, StreamCodec::FlushOp::FLUSH));
510   codec_->resetStream();
511   EXPECT_TRUE(
512       codec_->uncompressStream(input, output, StreamCodec::FlushOp::END));
513   codec_->resetStream(0);
514   EXPECT_TRUE(codec_->uncompressStream(input, output));
515   codec_->resetStream(0);
516   EXPECT_TRUE(
517       codec_->uncompressStream(input, output, StreamCodec::FlushOp::FLUSH));
518   codec_->resetStream(0);
519   EXPECT_TRUE(
520       codec_->uncompressStream(input, output, StreamCodec::FlushOp::END));
521 }
522
523 TEST_P(StreamingUnitTest, noForwardProgressOkay) {
524   auto inBuffer = IOBuf::create(2);
525   inBuffer->writableData()[0] = 'a';
526   inBuffer->writableData()[0] = 'a';
527   inBuffer->append(2);
528   auto input = inBuffer->coalesce();
529   auto compressed = codec_->compress(inBuffer.get());
530
531   auto outBuffer = IOBuf::create(codec_->maxCompressedLength(2));
532   MutableByteRange output{outBuffer->writableTail(), outBuffer->tailroom()};
533
534   ByteRange emptyInput;
535   MutableByteRange emptyOutput;
536
537   // Compress some data to avoid empty data special casing
538   codec_->resetStream();
539   while (!input.empty()) {
540     codec_->compressStream(input, output);
541   }
542   // empty input and output is okay for flush NONE and FLUSH.
543   codec_->compressStream(emptyInput, emptyOutput);
544   codec_->compressStream(emptyInput, emptyOutput, StreamCodec::FlushOp::FLUSH);
545
546   codec_->resetStream();
547   input = inBuffer->coalesce();
548   output = {outBuffer->writableTail(), outBuffer->tailroom()};
549   while (!input.empty()) {
550     codec_->compressStream(input, output);
551   }
552   // empty input and output is okay for flush END.
553   codec_->compressStream(emptyInput, emptyOutput, StreamCodec::FlushOp::END);
554
555   codec_->resetStream();
556   input = compressed->coalesce();
557   input.uncheckedSubtract(1); // Remove last byte so the operation is incomplete
558   output = {inBuffer->writableData(), inBuffer->length()};
559   // Uncompress some data to avoid empty data special casing
560   while (!input.empty()) {
561     EXPECT_FALSE(codec_->uncompressStream(input, output));
562   }
563   // empty input and output is okay for all flush values.
564   EXPECT_FALSE(codec_->uncompressStream(emptyInput, emptyOutput));
565   EXPECT_FALSE(codec_->uncompressStream(
566       emptyInput, emptyOutput, StreamCodec::FlushOp::FLUSH));
567   EXPECT_FALSE(codec_->uncompressStream(
568       emptyInput, emptyOutput, StreamCodec::FlushOp::END));
569 }
570
571 TEST_P(StreamingUnitTest, stateTransitions) {
572   auto inBuffer = IOBuf::create(1);
573   inBuffer->writableData()[0] = 'a';
574   inBuffer->append(1);
575   auto compressed = codec_->compress(inBuffer.get());
576   ByteRange const in = compressed->coalesce();
577   auto outBuffer = IOBuf::create(codec_->maxCompressedLength(in.size()));
578   MutableByteRange const out{outBuffer->writableTail(), outBuffer->tailroom()};
579
580   auto compress = [&](
581       StreamCodec::FlushOp flushOp = StreamCodec::FlushOp::NONE,
582       bool empty = false) {
583     auto input = in;
584     auto output = empty ? MutableByteRange{} : out;
585     return codec_->compressStream(input, output, flushOp);
586   };
587   auto uncompress = [&](
588       StreamCodec::FlushOp flushOp = StreamCodec::FlushOp::NONE,
589       bool empty = false) {
590     auto input = in;
591     auto output = empty ? MutableByteRange{} : out;
592     return codec_->uncompressStream(input, output, flushOp);
593   };
594
595   // compression flow
596   codec_->resetStream();
597   EXPECT_FALSE(compress());
598   EXPECT_FALSE(compress());
599   EXPECT_TRUE(compress(StreamCodec::FlushOp::FLUSH));
600   EXPECT_FALSE(compress());
601   EXPECT_TRUE(compress(StreamCodec::FlushOp::END));
602   // uncompression flow
603   codec_->resetStream();
604   EXPECT_FALSE(uncompress(StreamCodec::FlushOp::NONE, true));
605   codec_->resetStream();
606   EXPECT_FALSE(uncompress(StreamCodec::FlushOp::FLUSH, true));
607   codec_->resetStream();
608   EXPECT_FALSE(uncompress(StreamCodec::FlushOp::NONE, true));
609   codec_->resetStream();
610   EXPECT_FALSE(uncompress(StreamCodec::FlushOp::NONE, true));
611   codec_->resetStream();
612   EXPECT_TRUE(uncompress(StreamCodec::FlushOp::FLUSH));
613   // compress -> uncompress
614   codec_->resetStream();
615   EXPECT_FALSE(compress());
616   EXPECT_THROW(uncompress(), std::logic_error);
617   // uncompress -> compress
618   codec_->resetStream();
619   EXPECT_TRUE(uncompress(StreamCodec::FlushOp::FLUSH));
620   EXPECT_THROW(compress(), std::logic_error);
621   // end -> compress
622   codec_->resetStream();
623   EXPECT_FALSE(compress());
624   EXPECT_TRUE(compress(StreamCodec::FlushOp::END));
625   EXPECT_THROW(compress(), std::logic_error);
626   // end -> uncompress
627   codec_->resetStream();
628   EXPECT_TRUE(uncompress(StreamCodec::FlushOp::FLUSH));
629   EXPECT_THROW(uncompress(), std::logic_error);
630   // flush -> compress
631   codec_->resetStream();
632   EXPECT_FALSE(compress(StreamCodec::FlushOp::FLUSH, true));
633   EXPECT_THROW(compress(), std::logic_error);
634   // flush -> end
635   codec_->resetStream();
636   EXPECT_FALSE(compress(StreamCodec::FlushOp::FLUSH, true));
637   EXPECT_THROW(compress(StreamCodec::FlushOp::END), std::logic_error);
638   // undefined -> compress
639   codec_->compress(inBuffer.get());
640   EXPECT_THROW(compress(), std::logic_error);
641   codec_->uncompress(compressed.get());
642   EXPECT_THROW(compress(), std::logic_error);
643   // undefined -> undefined
644   codec_->uncompress(compressed.get());
645   codec_->compress(inBuffer.get());
646 }
647
648 INSTANTIATE_TEST_CASE_P(
649     StreamingUnitTest,
650     StreamingUnitTest,
651     testing::ValuesIn(availableStreamCodecs()));
652
653 class StreamingCompressionTest
654     : public testing::TestWithParam<std::tuple<int, int, CodecType>> {
655  protected:
656   void SetUp() override {
657     auto const tup = GetParam();
658     uncompressedLength_ = uint64_t(1) << std::get<0>(tup);
659     chunkSize_ = size_t(1) << std::get<1>(tup);
660     codec_ = getStreamCodec(std::get<2>(tup));
661   }
662
663   void runResetStreamTest(DataHolder const& dh);
664   void runCompressStreamTest(DataHolder const& dh);
665   void runUncompressStreamTest(DataHolder const& dh);
666   void runFlushTest(DataHolder const& dh);
667
668  private:
669   std::vector<ByteRange> split(ByteRange data) const;
670
671   uint64_t uncompressedLength_;
672   size_t chunkSize_;
673   std::unique_ptr<StreamCodec> codec_;
674 };
675
676 std::vector<ByteRange> StreamingCompressionTest::split(ByteRange data) const {
677   size_t const pieces = std::max<size_t>(1, data.size() / chunkSize_);
678   std::vector<ByteRange> result;
679   result.reserve(pieces + 1);
680   while (!data.empty()) {
681     size_t const pieceSize = std::min(data.size(), chunkSize_);
682     result.push_back(data.subpiece(0, pieceSize));
683     data.uncheckedAdvance(pieceSize);
684   }
685   return result;
686 }
687
688 static std::unique_ptr<IOBuf> compressSome(
689     StreamCodec* codec,
690     ByteRange data,
691     uint64_t bufferSize,
692     StreamCodec::FlushOp flush) {
693   bool result;
694   IOBufQueue queue;
695   do {
696     auto buffer = IOBuf::create(bufferSize);
697     buffer->append(buffer->capacity());
698     MutableByteRange output{buffer->writableData(), buffer->length()};
699
700     result = codec->compressStream(data, output, flush);
701     buffer->trimEnd(output.size());
702     queue.append(std::move(buffer));
703
704   } while (!(flush == StreamCodec::FlushOp::NONE && data.empty()) && !result);
705   EXPECT_TRUE(data.empty());
706   return queue.move();
707 }
708
709 static std::pair<bool, std::unique_ptr<IOBuf>> uncompressSome(
710     StreamCodec* codec,
711     ByteRange& data,
712     uint64_t bufferSize,
713     StreamCodec::FlushOp flush) {
714   bool result;
715   IOBufQueue queue;
716   do {
717     auto buffer = IOBuf::create(bufferSize);
718     buffer->append(buffer->capacity());
719     MutableByteRange output{buffer->writableData(), buffer->length()};
720
721     result = codec->uncompressStream(data, output, flush);
722     buffer->trimEnd(output.size());
723     queue.append(std::move(buffer));
724
725   } while (queue.tailroom() == 0 && !result);
726   return std::make_pair(result, queue.move());
727 }
728
729 void StreamingCompressionTest::runResetStreamTest(DataHolder const& dh) {
730   auto const input = dh.data(uncompressedLength_);
731   // Compress some but leave state unclean
732   codec_->resetStream(uncompressedLength_);
733   compressSome(codec_.get(), input, chunkSize_, StreamCodec::FlushOp::NONE);
734   // Reset stream and compress all
735   codec_->resetStream();
736   auto compressed =
737       compressSome(codec_.get(), input, chunkSize_, StreamCodec::FlushOp::END);
738   auto const uncompressed = codec_->uncompress(compressed.get(), input.size());
739   EXPECT_EQ(dh.hash(uncompressedLength_), hashIOBuf(uncompressed.get()));
740 }
741
742 TEST_P(StreamingCompressionTest, resetStream) {
743   runResetStreamTest(constantDataHolder);
744   runResetStreamTest(randomDataHolder);
745 }
746
747 void StreamingCompressionTest::runCompressStreamTest(
748     const folly::io::test::DataHolder& dh) {
749   auto const inputs = split(dh.data(uncompressedLength_));
750
751   IOBufQueue queue;
752   codec_->resetStream(uncompressedLength_);
753   // Compress many inputs in a row
754   for (auto const input : inputs) {
755     queue.append(compressSome(
756         codec_.get(), input, chunkSize_, StreamCodec::FlushOp::NONE));
757   }
758   // Finish the operation with empty input.
759   ByteRange empty;
760   queue.append(
761       compressSome(codec_.get(), empty, chunkSize_, StreamCodec::FlushOp::END));
762
763   auto const uncompressed = codec_->uncompress(queue.front());
764   EXPECT_EQ(dh.hash(uncompressedLength_), hashIOBuf(uncompressed.get()));
765 }
766
767 TEST_P(StreamingCompressionTest, compressStream) {
768   runCompressStreamTest(constantDataHolder);
769   runCompressStreamTest(randomDataHolder);
770 }
771
772 void StreamingCompressionTest::runUncompressStreamTest(
773     const folly::io::test::DataHolder& dh) {
774   auto const data = IOBuf::wrapBuffer(dh.data(uncompressedLength_));
775   // Concatenate 3 compressed frames in a row
776   auto compressed = codec_->compress(data.get());
777   compressed->prependChain(codec_->compress(data.get()));
778   compressed->prependChain(codec_->compress(data.get()));
779   // Pass all 3 compressed frames in one input buffer
780   auto input = compressed->coalesce();
781   // Uncompress the first frame
782   codec_->resetStream(data->computeChainDataLength());
783   {
784     auto const result = uncompressSome(
785         codec_.get(), input, chunkSize_, StreamCodec::FlushOp::FLUSH);
786     ASSERT_TRUE(result.first);
787     ASSERT_EQ(hashIOBuf(data.get()), hashIOBuf(result.second.get()));
788   }
789   // Uncompress the second frame
790   codec_->resetStream();
791   {
792     auto const result = uncompressSome(
793         codec_.get(), input, chunkSize_, StreamCodec::FlushOp::END);
794     ASSERT_TRUE(result.first);
795     ASSERT_EQ(hashIOBuf(data.get()), hashIOBuf(result.second.get()));
796   }
797   // Uncompress the third frame
798   codec_->resetStream();
799   {
800     auto const result = uncompressSome(
801         codec_.get(), input, chunkSize_, StreamCodec::FlushOp::FLUSH);
802     ASSERT_TRUE(result.first);
803     ASSERT_EQ(hashIOBuf(data.get()), hashIOBuf(result.second.get()));
804   }
805   EXPECT_TRUE(input.empty());
806 }
807
808 TEST_P(StreamingCompressionTest, uncompressStream) {
809   runUncompressStreamTest(constantDataHolder);
810   runUncompressStreamTest(randomDataHolder);
811 }
812
813 void StreamingCompressionTest::runFlushTest(DataHolder const& dh) {
814   auto const inputs = split(dh.data(uncompressedLength_));
815   auto uncodec = getStreamCodec(codec_->type());
816
817   codec_->resetStream();
818   for (auto input : inputs) {
819     // Compress some data and flush the stream
820     auto compressed = compressSome(
821         codec_.get(), input, chunkSize_, StreamCodec::FlushOp::FLUSH);
822     auto compressedRange = compressed->coalesce();
823     // Uncompress the compressed data
824     auto result = uncompressSome(
825         uncodec.get(),
826         compressedRange,
827         chunkSize_,
828         StreamCodec::FlushOp::FLUSH);
829     // All compressed data should have been consumed
830     EXPECT_TRUE(compressedRange.empty());
831     // The frame isn't complete
832     EXPECT_FALSE(result.first);
833     // The uncompressed data should be exactly the input data
834     EXPECT_EQ(input.size(), result.second->computeChainDataLength());
835     auto const data = IOBuf::wrapBuffer(input);
836     EXPECT_EQ(hashIOBuf(data.get()), hashIOBuf(result.second.get()));
837   }
838 }
839
840 TEST_P(StreamingCompressionTest, testFlush) {
841   runFlushTest(constantDataHolder);
842   runFlushTest(randomDataHolder);
843 }
844
845 INSTANTIATE_TEST_CASE_P(
846     StreamingCompressionTest,
847     StreamingCompressionTest,
848     testing::Combine(
849         testing::Values(0, 1, 12, 22, 27),
850         testing::Values(12, 17, 20),
851         testing::ValuesIn(availableStreamCodecs())));
852
853 class AutomaticCodecTest : public testing::TestWithParam<CodecType> {
854  protected:
855   void SetUp() override {
856     codec_ = getCodec(GetParam());
857     auto_ = getAutoUncompressionCodec();
858   }
859
860   void runSimpleTest(const DataHolder& dh);
861
862   std::unique_ptr<Codec> codec_;
863   std::unique_ptr<Codec> auto_;
864 };
865
866 void AutomaticCodecTest::runSimpleTest(const DataHolder& dh) {
867   constexpr uint64_t uncompressedLength = 1000;
868   auto original = IOBuf::wrapBuffer(dh.data(uncompressedLength));
869   auto compressed = codec_->compress(original.get());
870
871   if (!codec_->needsUncompressedLength()) {
872     auto uncompressed = auto_->uncompress(compressed.get());
873     EXPECT_EQ(uncompressedLength, uncompressed->computeChainDataLength());
874     EXPECT_EQ(dh.hash(uncompressedLength), hashIOBuf(uncompressed.get()));
875   }
876   {
877     auto uncompressed = auto_->uncompress(compressed.get(), uncompressedLength);
878     EXPECT_EQ(uncompressedLength, uncompressed->computeChainDataLength());
879     EXPECT_EQ(dh.hash(uncompressedLength), hashIOBuf(uncompressed.get()));
880   }
881   ASSERT_GE(compressed->computeChainDataLength(), 8);
882   for (size_t i = 0; i < 8; ++i) {
883     auto split = compressed->clone();
884     auto rest = compressed->clone();
885     split->trimEnd(split->length() - i);
886     rest->trimStart(i);
887     split->appendChain(std::move(rest));
888     auto uncompressed = auto_->uncompress(split.get(), uncompressedLength);
889     EXPECT_EQ(uncompressedLength, uncompressed->computeChainDataLength());
890     EXPECT_EQ(dh.hash(uncompressedLength), hashIOBuf(uncompressed.get()));
891   }
892 }
893
894 TEST_P(AutomaticCodecTest, RandomData) {
895   runSimpleTest(randomDataHolder);
896 }
897
898 TEST_P(AutomaticCodecTest, ConstantData) {
899   runSimpleTest(constantDataHolder);
900 }
901
902 TEST_P(AutomaticCodecTest, ValidPrefixes) {
903   const auto prefixes = codec_->validPrefixes();
904   for (const auto& prefix : prefixes) {
905     EXPECT_FALSE(prefix.empty());
906     // Ensure that all strings are at least 8 bytes for LZMA2.
907     // The bytes after the prefix should be ignored by `canUncompress()`.
908     IOBuf data{IOBuf::COPY_BUFFER, prefix, 0, 8};
909     data.append(8);
910     EXPECT_TRUE(codec_->canUncompress(&data));
911     EXPECT_TRUE(auto_->canUncompress(&data));
912   }
913 }
914
915 TEST_P(AutomaticCodecTest, NeedsUncompressedLength) {
916   if (codec_->needsUncompressedLength()) {
917     EXPECT_TRUE(auto_->needsUncompressedLength());
918   }
919 }
920
921 TEST_P(AutomaticCodecTest, maxUncompressedLength) {
922   EXPECT_LE(codec_->maxUncompressedLength(), auto_->maxUncompressedLength());
923 }
924
925 TEST_P(AutomaticCodecTest, DefaultCodec) {
926   const uint64_t length = 42;
927   std::vector<std::unique_ptr<Codec>> codecs;
928   codecs.push_back(getCodec(CodecType::ZSTD));
929   auto automatic = getAutoUncompressionCodec(std::move(codecs));
930   auto original = IOBuf::wrapBuffer(constantDataHolder.data(length));
931   auto compressed = codec_->compress(original.get());
932   auto decompressed = automatic->uncompress(compressed.get());
933
934   EXPECT_EQ(constantDataHolder.hash(length), hashIOBuf(decompressed.get()));
935 }
936
937 namespace {
938 class CustomCodec : public Codec {
939  public:
940   static std::unique_ptr<Codec> create(std::string prefix, CodecType type) {
941     return std::make_unique<CustomCodec>(std::move(prefix), type);
942   }
943   explicit CustomCodec(std::string prefix, CodecType type)
944       : Codec(CodecType::USER_DEFINED),
945         prefix_(std::move(prefix)),
946         codec_(getCodec(type)) {}
947
948  private:
949   std::vector<std::string> validPrefixes() const override {
950     return {prefix_};
951   }
952
953   uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override {
954     return codec_->maxCompressedLength(uncompressedLength) + prefix_.size();
955   }
956
957   bool canUncompress(const IOBuf* data, Optional<uint64_t>) const override {
958     auto clone = data->cloneCoalescedAsValue();
959     if (clone.length() < prefix_.size()) {
960       return false;
961     }
962     return memcmp(clone.data(), prefix_.data(), prefix_.size()) == 0;
963   }
964
965   std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override {
966     auto result = IOBuf::copyBuffer(prefix_);
967     result->appendChain(codec_->compress(data));
968     EXPECT_TRUE(canUncompress(result.get(), data->computeChainDataLength()));
969     return result;
970   }
971
972   std::unique_ptr<IOBuf> doUncompress(
973       const IOBuf* data,
974       Optional<uint64_t> uncompressedLength) override {
975     EXPECT_TRUE(canUncompress(data, uncompressedLength));
976     auto clone = data->cloneCoalescedAsValue();
977     clone.trimStart(prefix_.size());
978     return codec_->uncompress(&clone, uncompressedLength);
979   }
980
981   std::string prefix_;
982   std::unique_ptr<Codec> codec_;
983 };
984 }
985
986 TEST_P(AutomaticCodecTest, CustomCodec) {
987   const uint64_t length = 42;
988   auto ab = CustomCodec::create("ab", CodecType::ZSTD);
989   std::vector<std::unique_ptr<Codec>> codecs;
990   codecs.push_back(CustomCodec::create("ab", CodecType::ZSTD));
991   auto automatic = getAutoUncompressionCodec(std::move(codecs));
992   auto original = IOBuf::wrapBuffer(constantDataHolder.data(length));
993
994   auto abCompressed = ab->compress(original.get());
995   auto abDecompressed = automatic->uncompress(abCompressed.get());
996   EXPECT_TRUE(automatic->canUncompress(abCompressed.get()));
997   EXPECT_FALSE(auto_->canUncompress(abCompressed.get()));
998   EXPECT_EQ(constantDataHolder.hash(length), hashIOBuf(abDecompressed.get()));
999
1000   auto compressed = codec_->compress(original.get());
1001   auto decompressed = automatic->uncompress(compressed.get());
1002   EXPECT_EQ(constantDataHolder.hash(length), hashIOBuf(decompressed.get()));
1003 }
1004
1005 TEST_P(AutomaticCodecTest, CustomDefaultCodec) {
1006   const uint64_t length = 42;
1007   auto none = CustomCodec::create("none", CodecType::NO_COMPRESSION);
1008   std::vector<std::unique_ptr<Codec>> codecs;
1009   codecs.push_back(CustomCodec::create("none", CodecType::NO_COMPRESSION));
1010   codecs.push_back(getCodec(CodecType::LZ4_FRAME));
1011   auto automatic = getAutoUncompressionCodec(std::move(codecs));
1012   auto original = IOBuf::wrapBuffer(constantDataHolder.data(length));
1013
1014   auto noneCompressed = none->compress(original.get());
1015   auto noneDecompressed = automatic->uncompress(noneCompressed.get());
1016   EXPECT_TRUE(automatic->canUncompress(noneCompressed.get()));
1017   EXPECT_FALSE(auto_->canUncompress(noneCompressed.get()));
1018   EXPECT_EQ(constantDataHolder.hash(length), hashIOBuf(noneDecompressed.get()));
1019
1020   auto compressed = codec_->compress(original.get());
1021   auto decompressed = automatic->uncompress(compressed.get());
1022   EXPECT_EQ(constantDataHolder.hash(length), hashIOBuf(decompressed.get()));
1023 }
1024
1025 TEST_P(AutomaticCodecTest, canUncompressOneBytes) {
1026   // No default codec can uncompress 1 bytes.
1027   IOBuf buf{IOBuf::CREATE, 1};
1028   buf.append(1);
1029   EXPECT_FALSE(codec_->canUncompress(&buf, 1));
1030   EXPECT_FALSE(codec_->canUncompress(&buf, folly::none));
1031   EXPECT_FALSE(auto_->canUncompress(&buf, 1));
1032   EXPECT_FALSE(auto_->canUncompress(&buf, folly::none));
1033 }
1034
1035 INSTANTIATE_TEST_CASE_P(
1036     AutomaticCodecTest,
1037     AutomaticCodecTest,
1038     testing::Values(
1039         CodecType::LZ4_FRAME,
1040         CodecType::ZSTD,
1041         CodecType::ZLIB,
1042         CodecType::GZIP,
1043         CodecType::LZMA2,
1044         CodecType::BZIP2));
1045
1046 TEST(ValidPrefixesTest, CustomCodec) {
1047   std::vector<std::unique_ptr<Codec>> codecs;
1048   codecs.push_back(CustomCodec::create("none", CodecType::NO_COMPRESSION));
1049   const auto none = getAutoUncompressionCodec(std::move(codecs));
1050   const auto prefixes = none->validPrefixes();
1051   const auto it = std::find(prefixes.begin(), prefixes.end(), "none");
1052   EXPECT_TRUE(it != prefixes.end());
1053 }
1054
1055 #define EXPECT_THROW_IF_DEBUG(statement, expected_exception) \
1056   do {                                                       \
1057     if (kIsDebug) {                                          \
1058       EXPECT_THROW((statement), expected_exception);         \
1059     } else {                                                 \
1060       EXPECT_NO_THROW((statement));                          \
1061     }                                                        \
1062   } while (false)
1063
1064 TEST(CheckCompatibleTest, SimplePrefixSecond) {
1065   std::vector<std::unique_ptr<Codec>> codecs;
1066   codecs.push_back(CustomCodec::create("abc", CodecType::NO_COMPRESSION));
1067   codecs.push_back(CustomCodec::create("ab", CodecType::NO_COMPRESSION));
1068   EXPECT_THROW_IF_DEBUG(
1069       getAutoUncompressionCodec(std::move(codecs)), std::invalid_argument);
1070 }
1071
1072 TEST(CheckCompatibleTest, SimplePrefixFirst) {
1073   std::vector<std::unique_ptr<Codec>> codecs;
1074   codecs.push_back(CustomCodec::create("ab", CodecType::NO_COMPRESSION));
1075   codecs.push_back(CustomCodec::create("abc", CodecType::NO_COMPRESSION));
1076   EXPECT_THROW_IF_DEBUG(
1077       getAutoUncompressionCodec(std::move(codecs)), std::invalid_argument);
1078 }
1079
1080 TEST(CheckCompatibleTest, Empty) {
1081   std::vector<std::unique_ptr<Codec>> codecs;
1082   codecs.push_back(CustomCodec::create("", CodecType::NO_COMPRESSION));
1083   EXPECT_THROW_IF_DEBUG(
1084       getAutoUncompressionCodec(std::move(codecs)), std::invalid_argument);
1085 }
1086
1087 TEST(CheckCompatibleTest, ZstdPrefix) {
1088   std::vector<std::unique_ptr<Codec>> codecs;
1089   codecs.push_back(CustomCodec::create("\x28\xB5\x2F", CodecType::ZSTD));
1090   EXPECT_THROW_IF_DEBUG(
1091       getAutoUncompressionCodec(std::move(codecs)), std::invalid_argument);
1092 }
1093
1094 TEST(CheckCompatibleTest, ZstdDuplicate) {
1095   std::vector<std::unique_ptr<Codec>> codecs;
1096   codecs.push_back(CustomCodec::create("\x28\xB5\x2F\xFD", CodecType::ZSTD));
1097   EXPECT_THROW_IF_DEBUG(
1098       getAutoUncompressionCodec(std::move(codecs)), std::invalid_argument);
1099 }
1100
1101 TEST(CheckCompatibleTest, ZlibIsPrefix) {
1102   std::vector<std::unique_ptr<Codec>> codecs;
1103   codecs.push_back(CustomCodec::create("\x18\x76zzasdf", CodecType::ZSTD));
1104   EXPECT_THROW_IF_DEBUG(
1105       getAutoUncompressionCodec(std::move(codecs)), std::invalid_argument);
1106 }
1107
1108 #if FOLLY_HAVE_LIBZSTD
1109
1110 TEST(ZstdTest, BackwardCompatible) {
1111   auto codec = getCodec(CodecType::ZSTD);
1112   {
1113     auto const data = IOBuf::wrapBuffer(randomDataHolder.data(size_t(1) << 20));
1114     auto compressed = codec->compress(data.get());
1115     compressed->coalesce();
1116     EXPECT_EQ(
1117         data->length(),
1118         ZSTD_getDecompressedSize(compressed->data(), compressed->length()));
1119   }
1120   {
1121     auto const data =
1122         IOBuf::wrapBuffer(randomDataHolder.data(size_t(100) << 20));
1123     auto compressed = codec->compress(data.get());
1124     compressed->coalesce();
1125     EXPECT_EQ(
1126         data->length(),
1127         ZSTD_getDecompressedSize(compressed->data(), compressed->length()));
1128   }
1129 }
1130
1131 #endif
1132 } // namespace test
1133 } // namespace io
1134 } // namespace folly
1135
1136 int main(int argc, char *argv[]) {
1137   testing::InitGoogleTest(&argc, argv);
1138   gflags::ParseCommandLineFlags(&argc, &argv, true);
1139
1140   auto ret = RUN_ALL_TESTS();
1141   if (!ret) {
1142     folly::runBenchmarksOnFlag();
1143   }
1144   return ret;
1145 }