void StreamCodec::resetStream(Optional<uint64_t> uncompressedLength) {
state_ = State::RESET;
uncompressedLength_ = uncompressedLength;
+ progressMade_ = true;
doResetStream();
}
assertStateIs(State::COMPRESS_END);
break;
}
+ size_t const inputSize = input.size();
+ size_t const outputSize = output.size();
bool const done = doCompressStream(input, output, flushOp);
+ if (!done && inputSize == input.size() && outputSize == output.size()) {
+ if (!progressMade_) {
+ throw std::runtime_error("Codec: No forward progress made");
+ }
+ // Throw an exception if there is no progress again next time
+ progressMade_ = false;
+ } else {
+ progressMade_ = true;
+ }
// Handle output state transitions
if (done) {
if (state_ == State::COMPRESS_FLUSH) {
state_ = State::UNCOMPRESS;
}
assertStateIs(State::UNCOMPRESS);
+ size_t const inputSize = input.size();
+ size_t const outputSize = output.size();
bool const done = doUncompressStream(input, output, flushOp);
+ if (!done && inputSize == input.size() && outputSize == output.size()) {
+ if (!progressMade_) {
+ throw std::runtime_error("Codec: no forward progress made");
+ }
+ // Throw an exception if there is no progress again next time
+ progressMade_ = false;
+ } else {
+ progressMade_ = true;
+ }
// Handle output state transitions
if (done) {
state_ = State::END;
IOBuf const* current = data;
ByteRange input{current->data(), current->length()};
StreamCodec::FlushOp flushOp = StreamCodec::FlushOp::NONE;
- for (;;) {
+ bool done = false;
+ while (!done) {
while (input.empty() && current->next() != data) {
current = current->next();
input = {current->data(), current->length()};
if (output.empty()) {
buffer->prependChain(addOutputBuffer(output, kDefaultBufferLength));
}
- size_t const inputSize = input.size();
- size_t const outputSize = output.size();
- bool const done = compressStream(input, output, flushOp);
+ done = compressStream(input, output, flushOp);
if (done) {
DCHECK(input.empty());
DCHECK(flushOp == StreamCodec::FlushOp::END);
DCHECK_EQ(current->next(), data);
- break;
- }
- if (inputSize == input.size() && outputSize == output.size()) {
- throw std::runtime_error("Codec: No forward progress made");
}
}
buffer->prev()->trimEnd(output.size());
IOBuf const* current = data;
ByteRange input{current->data(), current->length()};
StreamCodec::FlushOp flushOp = StreamCodec::FlushOp::NONE;
- for (;;) {
+ bool done = false;
+ while (!done) {
while (input.empty() && current->next() != data) {
current = current->next();
input = {current->data(), current->length()};
if (output.empty()) {
buffer->prependChain(addOutputBuffer(output, defaultBufferLength));
}
- size_t const inputSize = input.size();
- size_t const outputSize = output.size();
- bool const done = uncompressStream(input, output, flushOp);
- if (done) {
- break;
- }
- if (inputSize == input.size() && outputSize == output.size()) {
- throw std::runtime_error("Codec: Truncated data");
- }
+ done = uncompressStream(input, output, flushOp);
}
if (!input.empty()) {
throw std::runtime_error("Codec: Junk after end of data");
codec_->uncompressStream(input, output, StreamCodec::FlushOp::END));
}
-TEST_P(StreamingUnitTest, noForwardProgressOkay) {
+TEST_P(StreamingUnitTest, noForwardProgress) {
auto inBuffer = IOBuf::create(2);
inBuffer->writableData()[0] = 'a';
- inBuffer->writableData()[0] = 'a';
+ inBuffer->writableData()[1] = 'a';
inBuffer->append(2);
- auto input = inBuffer->coalesce();
- auto compressed = codec_->compress(inBuffer.get());
-
+ const auto compressed = codec_->compress(inBuffer.get());
auto outBuffer = IOBuf::create(codec_->maxCompressedLength(2));
- MutableByteRange output{outBuffer->writableTail(), outBuffer->tailroom()};
ByteRange emptyInput;
MutableByteRange emptyOutput;
- // Compress some data to avoid empty data special casing
- if (codec_->needsDataLength()) {
- codec_->resetStream(inBuffer->computeChainDataLength());
- } else {
- codec_->resetStream();
- }
- while (!input.empty()) {
- codec_->compressStream(input, output);
+ const std::array<StreamCodec::FlushOp, 3> flushOps = {{
+ StreamCodec::FlushOp::NONE,
+ StreamCodec::FlushOp::FLUSH,
+ StreamCodec::FlushOp::END,
+ }};
+
+ // No progress is not okay twice in a row for all flush operations when
+ // compressing
+ for (const auto flushOp : flushOps) {
+ if (codec_->needsDataLength()) {
+ codec_->resetStream(inBuffer->computeChainDataLength());
+ } else {
+ codec_->resetStream();
+ }
+ auto input = inBuffer->coalesce();
+ MutableByteRange output = {outBuffer->writableTail(),
+ outBuffer->tailroom()};
+ // Compress some data to avoid empty data special casing
+ while (!input.empty()) {
+ codec_->compressStream(input, output);
+ }
+ EXPECT_FALSE(codec_->compressStream(emptyInput, emptyOutput, flushOp));
+ EXPECT_THROW(
+ codec_->compressStream(emptyInput, emptyOutput, flushOp),
+ std::runtime_error);
}
- // empty input and output is okay for flush NONE and FLUSH.
- codec_->compressStream(emptyInput, emptyOutput);
- codec_->compressStream(emptyInput, emptyOutput, StreamCodec::FlushOp::FLUSH);
- if (codec_->needsDataLength()) {
- codec_->resetStream(inBuffer->computeChainDataLength());
- } else {
+ // No progress is not okay twice in a row for all flush operations when
+ // uncompressing
+ for (const auto flushOp : flushOps) {
codec_->resetStream();
+ auto input = compressed->coalesce();
+ // Remove the last byte so the operation is incomplete
+ input.uncheckedSubtract(1);
+ MutableByteRange output = {inBuffer->writableData(), inBuffer->length()};
+ // Uncompress some data to avoid empty data special casing
+ while (!input.empty()) {
+ EXPECT_FALSE(codec_->uncompressStream(input, output));
+ }
+ EXPECT_FALSE(codec_->uncompressStream(emptyInput, emptyOutput, flushOp));
+ EXPECT_THROW(
+ codec_->uncompressStream(emptyInput, emptyOutput, flushOp),
+ std::runtime_error);
}
- input = inBuffer->coalesce();
- output = {outBuffer->writableTail(), outBuffer->tailroom()};
- while (!input.empty()) {
- codec_->compressStream(input, output);
- }
- // empty input and output is okay for flush END.
- codec_->compressStream(emptyInput, emptyOutput, StreamCodec::FlushOp::END);
-
- codec_->resetStream();
- input = compressed->coalesce();
- input.uncheckedSubtract(1); // Remove last byte so the operation is incomplete
- output = {inBuffer->writableData(), inBuffer->length()};
- // Uncompress some data to avoid empty data special casing
- while (!input.empty()) {
- EXPECT_FALSE(codec_->uncompressStream(input, output));
- }
- // empty input and output is okay for all flush values.
- EXPECT_FALSE(codec_->uncompressStream(emptyInput, emptyOutput));
- EXPECT_FALSE(codec_->uncompressStream(
- emptyInput, emptyOutput, StreamCodec::FlushOp::FLUSH));
- EXPECT_FALSE(codec_->uncompressStream(
- emptyInput, emptyOutput, StreamCodec::FlushOp::END));
}
TEST_P(StreamingUnitTest, stateTransitions) {
- auto inBuffer = IOBuf::create(1);
+ auto inBuffer = IOBuf::create(2);
inBuffer->writableData()[0] = 'a';
- inBuffer->append(1);
+ inBuffer->writableData()[1] = 'a';
+ inBuffer->append(2);
auto compressed = codec_->compress(inBuffer.get());
ByteRange const in = compressed->coalesce();
auto outBuffer = IOBuf::create(codec_->maxCompressedLength(in.size()));