From: Stella Lau Date: Fri, 25 Aug 2017 16:23:46 +0000 (-0700) Subject: Enforce forward progress with StreamCodec X-Git-Tag: v2017.08.28.00~3 X-Git-Url: http://demsky.eecs.uci.edu/git/?a=commitdiff_plain;h=91022bb0ee7086fead232194a89f3988acea9612;p=folly.git Enforce forward progress with StreamCodec Summary: - Throw exception if no forward progress was made with `StreamCodec.compress()` and `StreamCodec.uncompress()` - Prevents infinite looping behavior when no forward progress was made - Update tests Reviewed By: terrelln Differential Revision: D5685690 fbshipit-source-id: 969393896b74f51250f0e0ce3af0cd4fedcab49a --- diff --git a/folly/io/Compression.cpp b/folly/io/Compression.cpp index 883d0da3..b41af0b9 100644 --- a/folly/io/Compression.cpp +++ b/folly/io/Compression.cpp @@ -234,6 +234,7 @@ void StreamCodec::assertStateIs(State expected) const { void StreamCodec::resetStream(Optional uncompressedLength) { state_ = State::RESET; uncompressedLength_ = uncompressedLength; + progressMade_ = true; doResetStream(); } @@ -279,7 +280,18 @@ bool StreamCodec::compressStream( assertStateIs(State::COMPRESS_END); break; } + size_t const inputSize = input.size(); + size_t const outputSize = output.size(); bool const done = doCompressStream(input, output, flushOp); + if (!done && inputSize == input.size() && outputSize == output.size()) { + if (!progressMade_) { + throw std::runtime_error("Codec: No forward progress made"); + } + // Throw an exception if there is no progress again next time + progressMade_ = false; + } else { + progressMade_ = true; + } // Handle output state transitions if (done) { if (state_ == State::COMPRESS_FLUSH) { @@ -309,7 +321,18 @@ bool StreamCodec::uncompressStream( state_ = State::UNCOMPRESS; } assertStateIs(State::UNCOMPRESS); + size_t const inputSize = input.size(); + size_t const outputSize = output.size(); bool const done = doUncompressStream(input, output, flushOp); + if (!done && inputSize == input.size() && outputSize == output.size()) { + if (!progressMade_) { + throw std::runtime_error("Codec: no forward progress made"); + } + // Throw an exception if there is no progress again next time + progressMade_ = false; + } else { + progressMade_ = true; + } // Handle output state transitions if (done) { state_ = State::END; @@ -345,7 +368,8 @@ std::unique_ptr StreamCodec::doCompress(IOBuf const* data) { IOBuf const* current = data; ByteRange input{current->data(), current->length()}; StreamCodec::FlushOp flushOp = StreamCodec::FlushOp::NONE; - for (;;) { + bool done = false; + while (!done) { while (input.empty() && current->next() != data) { current = current->next(); input = {current->data(), current->length()}; @@ -357,17 +381,11 @@ std::unique_ptr StreamCodec::doCompress(IOBuf const* data) { if (output.empty()) { buffer->prependChain(addOutputBuffer(output, kDefaultBufferLength)); } - size_t const inputSize = input.size(); - size_t const outputSize = output.size(); - bool const done = compressStream(input, output, flushOp); + done = compressStream(input, output, flushOp); if (done) { DCHECK(input.empty()); DCHECK(flushOp == StreamCodec::FlushOp::END); DCHECK_EQ(current->next(), data); - break; - } - if (inputSize == input.size() && outputSize == output.size()) { - throw std::runtime_error("Codec: No forward progress made"); } } buffer->prev()->trimEnd(output.size()); @@ -404,7 +422,8 @@ std::unique_ptr StreamCodec::doUncompress( IOBuf const* current = data; ByteRange input{current->data(), current->length()}; StreamCodec::FlushOp flushOp = StreamCodec::FlushOp::NONE; - for (;;) { + bool done = false; + while (!done) { while (input.empty() && current->next() != data) { current = current->next(); input = {current->data(), current->length()}; @@ -416,15 +435,7 @@ std::unique_ptr StreamCodec::doUncompress( if (output.empty()) { buffer->prependChain(addOutputBuffer(output, defaultBufferLength)); } - size_t const inputSize = input.size(); - size_t const outputSize = output.size(); - bool const done = uncompressStream(input, output, flushOp); - if (done) { - break; - } - if (inputSize == input.size() && outputSize == output.size()) { - throw std::runtime_error("Codec: Truncated data"); - } + done = uncompressStream(input, output, flushOp); } if (!input.empty()) { throw std::runtime_error("Codec: Junk after end of data"); diff --git a/folly/io/Compression.h b/folly/io/Compression.h index b8ff6455..462bb5d0 100644 --- a/folly/io/Compression.h +++ b/folly/io/Compression.h @@ -304,7 +304,8 @@ class StreamCodec : public Codec { * flushOp. * * A std::logic_error is thrown on incorrect usage of the API. - * A std::runtime_error is thrown upon error conditions. + * A std::runtime_error is thrown upon error conditions or if no forward + * progress could be made twice in a row. */ bool compressStream( folly::ByteRange& input, @@ -340,6 +341,10 @@ class StreamCodec : public Codec { * compressStream() with flushOp FLUSH. Most users don't need to use this * flushOp. * + * A std::runtime_error is thrown upon error conditions or if no forward + * progress could be made upon two consecutive calls to the function (only the + * second call will throw an exception). + * * Returns true at the end of a frame. At this point resetStream() must be * called to reuse the codec. */ @@ -390,6 +395,7 @@ class StreamCodec : public Codec { State state_{State::RESET}; ByteRange previousInput_{}; folly::Optional uncompressedLength_{}; + bool progressMade_{true}; }; constexpr int COMPRESSION_LEVEL_FASTEST = -1; diff --git a/folly/io/test/CompressionTest.cpp b/folly/io/test/CompressionTest.cpp index c8deed91..6a1ff71f 100644 --- a/folly/io/test/CompressionTest.cpp +++ b/folly/io/test/CompressionTest.cpp @@ -562,66 +562,68 @@ TEST_P(StreamingUnitTest, emptyData) { codec_->uncompressStream(input, output, StreamCodec::FlushOp::END)); } -TEST_P(StreamingUnitTest, noForwardProgressOkay) { +TEST_P(StreamingUnitTest, noForwardProgress) { auto inBuffer = IOBuf::create(2); inBuffer->writableData()[0] = 'a'; - inBuffer->writableData()[0] = 'a'; + inBuffer->writableData()[1] = 'a'; inBuffer->append(2); - auto input = inBuffer->coalesce(); - auto compressed = codec_->compress(inBuffer.get()); - + const auto compressed = codec_->compress(inBuffer.get()); auto outBuffer = IOBuf::create(codec_->maxCompressedLength(2)); - MutableByteRange output{outBuffer->writableTail(), outBuffer->tailroom()}; ByteRange emptyInput; MutableByteRange emptyOutput; - // Compress some data to avoid empty data special casing - if (codec_->needsDataLength()) { - codec_->resetStream(inBuffer->computeChainDataLength()); - } else { - codec_->resetStream(); - } - while (!input.empty()) { - codec_->compressStream(input, output); + const std::array flushOps = {{ + StreamCodec::FlushOp::NONE, + StreamCodec::FlushOp::FLUSH, + StreamCodec::FlushOp::END, + }}; + + // No progress is not okay twice in a row for all flush operations when + // compressing + for (const auto flushOp : flushOps) { + if (codec_->needsDataLength()) { + codec_->resetStream(inBuffer->computeChainDataLength()); + } else { + codec_->resetStream(); + } + auto input = inBuffer->coalesce(); + MutableByteRange output = {outBuffer->writableTail(), + outBuffer->tailroom()}; + // Compress some data to avoid empty data special casing + while (!input.empty()) { + codec_->compressStream(input, output); + } + EXPECT_FALSE(codec_->compressStream(emptyInput, emptyOutput, flushOp)); + EXPECT_THROW( + codec_->compressStream(emptyInput, emptyOutput, flushOp), + std::runtime_error); } - // empty input and output is okay for flush NONE and FLUSH. - codec_->compressStream(emptyInput, emptyOutput); - codec_->compressStream(emptyInput, emptyOutput, StreamCodec::FlushOp::FLUSH); - if (codec_->needsDataLength()) { - codec_->resetStream(inBuffer->computeChainDataLength()); - } else { + // No progress is not okay twice in a row for all flush operations when + // uncompressing + for (const auto flushOp : flushOps) { codec_->resetStream(); + auto input = compressed->coalesce(); + // Remove the last byte so the operation is incomplete + input.uncheckedSubtract(1); + MutableByteRange output = {inBuffer->writableData(), inBuffer->length()}; + // Uncompress some data to avoid empty data special casing + while (!input.empty()) { + EXPECT_FALSE(codec_->uncompressStream(input, output)); + } + EXPECT_FALSE(codec_->uncompressStream(emptyInput, emptyOutput, flushOp)); + EXPECT_THROW( + codec_->uncompressStream(emptyInput, emptyOutput, flushOp), + std::runtime_error); } - input = inBuffer->coalesce(); - output = {outBuffer->writableTail(), outBuffer->tailroom()}; - while (!input.empty()) { - codec_->compressStream(input, output); - } - // empty input and output is okay for flush END. - codec_->compressStream(emptyInput, emptyOutput, StreamCodec::FlushOp::END); - - codec_->resetStream(); - input = compressed->coalesce(); - input.uncheckedSubtract(1); // Remove last byte so the operation is incomplete - output = {inBuffer->writableData(), inBuffer->length()}; - // Uncompress some data to avoid empty data special casing - while (!input.empty()) { - EXPECT_FALSE(codec_->uncompressStream(input, output)); - } - // empty input and output is okay for all flush values. - EXPECT_FALSE(codec_->uncompressStream(emptyInput, emptyOutput)); - EXPECT_FALSE(codec_->uncompressStream( - emptyInput, emptyOutput, StreamCodec::FlushOp::FLUSH)); - EXPECT_FALSE(codec_->uncompressStream( - emptyInput, emptyOutput, StreamCodec::FlushOp::END)); } TEST_P(StreamingUnitTest, stateTransitions) { - auto inBuffer = IOBuf::create(1); + auto inBuffer = IOBuf::create(2); inBuffer->writableData()[0] = 'a'; - inBuffer->append(1); + inBuffer->writableData()[1] = 'a'; + inBuffer->append(2); auto compressed = codec_->compress(inBuffer.get()); ByteRange const in = compressed->coalesce(); auto outBuffer = IOBuf::create(codec_->maxCompressedLength(in.size()));