Enforce forward progress with StreamCodec
authorStella Lau <laus@fb.com>
Fri, 25 Aug 2017 16:23:46 +0000 (09:23 -0700)
committerFacebook Github Bot <facebook-github-bot@users.noreply.github.com>
Fri, 25 Aug 2017 16:34:57 +0000 (09:34 -0700)
Summary:
- Throw exception if no forward progress was made with `StreamCodec.compress()` and `StreamCodec.uncompress()`
- Prevents infinite looping behavior when no forward progress was made
- Update tests

Reviewed By: terrelln

Differential Revision: D5685690

fbshipit-source-id: 969393896b74f51250f0e0ce3af0cd4fedcab49a

folly/io/Compression.cpp
folly/io/Compression.h
folly/io/test/CompressionTest.cpp

index 883d0da3ab5a876e1ac214fc924be6612f926a44..b41af0b91b66d6eb240d7b033b783e53c7cde95b 100644 (file)
@@ -234,6 +234,7 @@ void StreamCodec::assertStateIs(State expected) const {
 void StreamCodec::resetStream(Optional<uint64_t> uncompressedLength) {
   state_ = State::RESET;
   uncompressedLength_ = uncompressedLength;
+  progressMade_ = true;
   doResetStream();
 }
 
@@ -279,7 +280,18 @@ bool StreamCodec::compressStream(
       assertStateIs(State::COMPRESS_END);
       break;
   }
+  size_t const inputSize = input.size();
+  size_t const outputSize = output.size();
   bool const done = doCompressStream(input, output, flushOp);
+  if (!done && inputSize == input.size() && outputSize == output.size()) {
+    if (!progressMade_) {
+      throw std::runtime_error("Codec: No forward progress made");
+    }
+    // Throw an exception if there is no progress again next time
+    progressMade_ = false;
+  } else {
+    progressMade_ = true;
+  }
   // Handle output state transitions
   if (done) {
     if (state_ == State::COMPRESS_FLUSH) {
@@ -309,7 +321,18 @@ bool StreamCodec::uncompressStream(
     state_ = State::UNCOMPRESS;
   }
   assertStateIs(State::UNCOMPRESS);
+  size_t const inputSize = input.size();
+  size_t const outputSize = output.size();
   bool const done = doUncompressStream(input, output, flushOp);
+  if (!done && inputSize == input.size() && outputSize == output.size()) {
+    if (!progressMade_) {
+      throw std::runtime_error("Codec: no forward progress made");
+    }
+    // Throw an exception if there is no progress again next time
+    progressMade_ = false;
+  } else {
+    progressMade_ = true;
+  }
   // Handle output state transitions
   if (done) {
     state_ = State::END;
@@ -345,7 +368,8 @@ std::unique_ptr<IOBuf> StreamCodec::doCompress(IOBuf const* data) {
   IOBuf const* current = data;
   ByteRange input{current->data(), current->length()};
   StreamCodec::FlushOp flushOp = StreamCodec::FlushOp::NONE;
-  for (;;) {
+  bool done = false;
+  while (!done) {
     while (input.empty() && current->next() != data) {
       current = current->next();
       input = {current->data(), current->length()};
@@ -357,17 +381,11 @@ std::unique_ptr<IOBuf> StreamCodec::doCompress(IOBuf const* data) {
     if (output.empty()) {
       buffer->prependChain(addOutputBuffer(output, kDefaultBufferLength));
     }
-    size_t const inputSize = input.size();
-    size_t const outputSize = output.size();
-    bool const done = compressStream(input, output, flushOp);
+    done = compressStream(input, output, flushOp);
     if (done) {
       DCHECK(input.empty());
       DCHECK(flushOp == StreamCodec::FlushOp::END);
       DCHECK_EQ(current->next(), data);
-      break;
-    }
-    if (inputSize == input.size() && outputSize == output.size()) {
-      throw std::runtime_error("Codec: No forward progress made");
     }
   }
   buffer->prev()->trimEnd(output.size());
@@ -404,7 +422,8 @@ std::unique_ptr<IOBuf> StreamCodec::doUncompress(
   IOBuf const* current = data;
   ByteRange input{current->data(), current->length()};
   StreamCodec::FlushOp flushOp = StreamCodec::FlushOp::NONE;
-  for (;;) {
+  bool done = false;
+  while (!done) {
     while (input.empty() && current->next() != data) {
       current = current->next();
       input = {current->data(), current->length()};
@@ -416,15 +435,7 @@ std::unique_ptr<IOBuf> StreamCodec::doUncompress(
     if (output.empty()) {
       buffer->prependChain(addOutputBuffer(output, defaultBufferLength));
     }
-    size_t const inputSize = input.size();
-    size_t const outputSize = output.size();
-    bool const done = uncompressStream(input, output, flushOp);
-    if (done) {
-      break;
-    }
-    if (inputSize == input.size() && outputSize == output.size()) {
-      throw std::runtime_error("Codec: Truncated data");
-    }
+    done = uncompressStream(input, output, flushOp);
   }
   if (!input.empty()) {
     throw std::runtime_error("Codec: Junk after end of data");
index b8ff64553c0e0446ac660fb793b140b195a521b1..462bb5d011b7a097c28e067465681255bc1edfe0 100644 (file)
@@ -304,7 +304,8 @@ class StreamCodec : public Codec {
    * flushOp.
    *
    * A std::logic_error is thrown on incorrect usage of the API.
-   * A std::runtime_error is thrown upon error conditions.
+   * A std::runtime_error is thrown upon error conditions or if no forward
+   * progress could be made twice in a row.
    */
   bool compressStream(
       folly::ByteRange& input,
@@ -340,6 +341,10 @@ class StreamCodec : public Codec {
    * compressStream() with flushOp FLUSH. Most users don't need to use this
    * flushOp.
    *
+   * A std::runtime_error is thrown upon error conditions or if no forward
+   * progress could be made upon two consecutive calls to the function (only the
+   * second call will throw an exception).
+   *
    * Returns true at the end of a frame. At this point resetStream() must be
    * called to reuse the codec.
    */
@@ -390,6 +395,7 @@ class StreamCodec : public Codec {
   State state_{State::RESET};
   ByteRange previousInput_{};
   folly::Optional<uint64_t> uncompressedLength_{};
+  bool progressMade_{true};
 };
 
 constexpr int COMPRESSION_LEVEL_FASTEST = -1;
index c8deed919619bc0ffb47800b43e7895ae9bc8ed1..6a1ff71fedc58404ad97cccfd8c1c063c4d0b07c 100644 (file)
@@ -562,66 +562,68 @@ TEST_P(StreamingUnitTest, emptyData) {
       codec_->uncompressStream(input, output, StreamCodec::FlushOp::END));
 }
 
-TEST_P(StreamingUnitTest, noForwardProgressOkay) {
+TEST_P(StreamingUnitTest, noForwardProgress) {
   auto inBuffer = IOBuf::create(2);
   inBuffer->writableData()[0] = 'a';
-  inBuffer->writableData()[0] = 'a';
+  inBuffer->writableData()[1] = 'a';
   inBuffer->append(2);
-  auto input = inBuffer->coalesce();
-  auto compressed = codec_->compress(inBuffer.get());
-
+  const auto compressed = codec_->compress(inBuffer.get());
   auto outBuffer = IOBuf::create(codec_->maxCompressedLength(2));
-  MutableByteRange output{outBuffer->writableTail(), outBuffer->tailroom()};
 
   ByteRange emptyInput;
   MutableByteRange emptyOutput;
 
-  // Compress some data to avoid empty data special casing
-  if (codec_->needsDataLength()) {
-    codec_->resetStream(inBuffer->computeChainDataLength());
-  } else {
-    codec_->resetStream();
-  }
-  while (!input.empty()) {
-    codec_->compressStream(input, output);
+  const std::array<StreamCodec::FlushOp, 3> flushOps = {{
+      StreamCodec::FlushOp::NONE,
+      StreamCodec::FlushOp::FLUSH,
+      StreamCodec::FlushOp::END,
+  }};
+
+  // No progress is not okay twice in a row for all flush operations when
+  // compressing
+  for (const auto flushOp : flushOps) {
+    if (codec_->needsDataLength()) {
+      codec_->resetStream(inBuffer->computeChainDataLength());
+    } else {
+      codec_->resetStream();
+    }
+    auto input = inBuffer->coalesce();
+    MutableByteRange output = {outBuffer->writableTail(),
+                               outBuffer->tailroom()};
+    // Compress some data to avoid empty data special casing
+    while (!input.empty()) {
+      codec_->compressStream(input, output);
+    }
+    EXPECT_FALSE(codec_->compressStream(emptyInput, emptyOutput, flushOp));
+    EXPECT_THROW(
+        codec_->compressStream(emptyInput, emptyOutput, flushOp),
+        std::runtime_error);
   }
-  // empty input and output is okay for flush NONE and FLUSH.
-  codec_->compressStream(emptyInput, emptyOutput);
-  codec_->compressStream(emptyInput, emptyOutput, StreamCodec::FlushOp::FLUSH);
 
-  if (codec_->needsDataLength()) {
-    codec_->resetStream(inBuffer->computeChainDataLength());
-  } else {
+  // No progress is not okay twice in a row for all flush operations when
+  // uncompressing
+  for (const auto flushOp : flushOps) {
     codec_->resetStream();
+    auto input = compressed->coalesce();
+    // Remove the last byte so the operation is incomplete
+    input.uncheckedSubtract(1);
+    MutableByteRange output = {inBuffer->writableData(), inBuffer->length()};
+    // Uncompress some data to avoid empty data special casing
+    while (!input.empty()) {
+      EXPECT_FALSE(codec_->uncompressStream(input, output));
+    }
+    EXPECT_FALSE(codec_->uncompressStream(emptyInput, emptyOutput, flushOp));
+    EXPECT_THROW(
+        codec_->uncompressStream(emptyInput, emptyOutput, flushOp),
+        std::runtime_error);
   }
-  input = inBuffer->coalesce();
-  output = {outBuffer->writableTail(), outBuffer->tailroom()};
-  while (!input.empty()) {
-    codec_->compressStream(input, output);
-  }
-  // empty input and output is okay for flush END.
-  codec_->compressStream(emptyInput, emptyOutput, StreamCodec::FlushOp::END);
-
-  codec_->resetStream();
-  input = compressed->coalesce();
-  input.uncheckedSubtract(1); // Remove last byte so the operation is incomplete
-  output = {inBuffer->writableData(), inBuffer->length()};
-  // Uncompress some data to avoid empty data special casing
-  while (!input.empty()) {
-    EXPECT_FALSE(codec_->uncompressStream(input, output));
-  }
-  // empty input and output is okay for all flush values.
-  EXPECT_FALSE(codec_->uncompressStream(emptyInput, emptyOutput));
-  EXPECT_FALSE(codec_->uncompressStream(
-      emptyInput, emptyOutput, StreamCodec::FlushOp::FLUSH));
-  EXPECT_FALSE(codec_->uncompressStream(
-      emptyInput, emptyOutput, StreamCodec::FlushOp::END));
 }
 
 TEST_P(StreamingUnitTest, stateTransitions) {
-  auto inBuffer = IOBuf::create(1);
+  auto inBuffer = IOBuf::create(2);
   inBuffer->writableData()[0] = 'a';
-  inBuffer->append(1);
+  inBuffer->writableData()[1] = 'a';
+  inBuffer->append(2);
   auto compressed = codec_->compress(inBuffer.get());
   ByteRange const in = compressed->coalesce();
   auto outBuffer = IOBuf::create(codec_->maxCompressedLength(in.size()));