Add streaming API
authorNick Terrell <terrelln@fb.com>
Wed, 24 May 2017 21:59:41 +0000 (14:59 -0700)
committerFacebook Github Bot <facebook-github-bot@users.noreply.github.com>
Wed, 24 May 2017 22:05:53 +0000 (15:05 -0700)
Summary:
Adds a C-style streaming API to `folly/io/Compression.h`, with a zlib-esque interface.
Stacked diffs will add streaming support to zstd, zlib, gzip, lzma, lz4_frame, and automatic codecs.
This interface is targeting advanced users who are building higher level interfaces.
They can use this as a common base so they don't have to reimplement the same code for every codec.

Reviewed By: yfeldblum

Differential Revision: D5026332

fbshipit-source-id: e3abf1767b493c2fef153b895858a3a81b67d989

folly/io/Compression.cpp
folly/io/Compression.h
folly/io/test/CompressionTest.cpp

index 4c02802e0746dfb51aabc4200ef5774184e8a1c0..2c47093bbe13b6f1adbd91a62c304ceb99ef09db 100644 (file)
@@ -176,6 +176,242 @@ std::string Codec::doUncompressString(
   return output;
 }
 
+uint64_t Codec::maxCompressedLength(uint64_t uncompressedLength) const {
+  if (uncompressedLength == 0) {
+    return 0;
+  }
+  return doMaxCompressedLength(uncompressedLength);
+}
+
+Optional<uint64_t> Codec::getUncompressedLength(
+    const folly::IOBuf* data,
+    Optional<uint64_t> uncompressedLength) const {
+  auto const compressedLength = data->computeChainDataLength();
+  if (uncompressedLength == uint64_t(0) || compressedLength == 0) {
+    if (uncompressedLength.value_or(0) != 0 || compressedLength != 0) {
+      throw std::runtime_error("Invalid uncompressed length");
+    }
+    return 0;
+  }
+  return doGetUncompressedLength(data, uncompressedLength);
+}
+
+Optional<uint64_t> Codec::doGetUncompressedLength(
+    const folly::IOBuf*,
+    Optional<uint64_t> uncompressedLength) const {
+  return uncompressedLength;
+}
+
+bool StreamCodec::needsDataLength() const {
+  return doNeedsDataLength();
+}
+
+bool StreamCodec::doNeedsDataLength() const {
+  return false;
+}
+
+void StreamCodec::assertStateIs(State expected) const {
+  if (state_ != expected) {
+    throw std::logic_error(folly::to<std::string>(
+        "Codec: state is ", state_, "; expected state ", expected));
+  }
+}
+
+void StreamCodec::resetStream(Optional<uint64_t> uncompressedLength) {
+  state_ = State::RESET;
+  uncompressedLength_ = uncompressedLength;
+  doResetStream();
+}
+
+bool StreamCodec::compressStream(
+    ByteRange& input,
+    MutableByteRange& output,
+    StreamCodec::FlushOp flushOp) {
+  if (state_ == State::RESET && input.empty()) {
+    if (flushOp == StreamCodec::FlushOp::NONE) {
+      return false;
+    }
+    if (flushOp == StreamCodec::FlushOp::END &&
+        uncompressedLength().value_or(0) != 0) {
+      throw std::runtime_error("Codec: invalid uncompressed length");
+    }
+    return true;
+  }
+  if (state_ == State::RESET && !input.empty() &&
+      uncompressedLength() == uint64_t(0)) {
+    throw std::runtime_error("Codec: invalid uncompressed length");
+  }
+  // Handle input state transitions
+  switch (flushOp) {
+    case StreamCodec::FlushOp::NONE:
+      if (state_ == State::RESET) {
+        state_ = State::COMPRESS;
+      }
+      assertStateIs(State::COMPRESS);
+      break;
+    case StreamCodec::FlushOp::FLUSH:
+      if (state_ == State::RESET || state_ == State::COMPRESS) {
+        state_ = State::COMPRESS_FLUSH;
+      }
+      assertStateIs(State::COMPRESS_FLUSH);
+      break;
+    case StreamCodec::FlushOp::END:
+      if (state_ == State::RESET || state_ == State::COMPRESS) {
+        state_ = State::COMPRESS_END;
+      }
+      assertStateIs(State::COMPRESS_END);
+      break;
+  }
+  bool const done = doCompressStream(input, output, flushOp);
+  // Handle output state transitions
+  if (done) {
+    if (state_ == State::COMPRESS_FLUSH) {
+      state_ = State::COMPRESS;
+    } else if (state_ == State::COMPRESS_END) {
+      state_ = State::END;
+    }
+    // Check internal invariants
+    DCHECK(input.empty());
+    DCHECK(flushOp != StreamCodec::FlushOp::NONE);
+  }
+  return done;
+}
+
+bool StreamCodec::uncompressStream(
+    ByteRange& input,
+    MutableByteRange& output,
+    StreamCodec::FlushOp flushOp) {
+  if (state_ == State::RESET && input.empty()) {
+    if (uncompressedLength().value_or(0) == 0) {
+      return true;
+    }
+    return false;
+  }
+  // Handle input state transitions
+  if (state_ == State::RESET) {
+    state_ = State::UNCOMPRESS;
+  }
+  assertStateIs(State::UNCOMPRESS);
+  bool const done = doUncompressStream(input, output, flushOp);
+  // Handle output state transitions
+  if (done) {
+    state_ = State::END;
+  }
+  return done;
+}
+
+static std::unique_ptr<IOBuf> addOutputBuffer(
+    MutableByteRange& output,
+    uint64_t size) {
+  DCHECK(output.empty());
+  auto buffer = IOBuf::create(size);
+  buffer->append(buffer->capacity());
+  output = {buffer->writableData(), buffer->length()};
+  return buffer;
+}
+
+std::unique_ptr<IOBuf> StreamCodec::doCompress(IOBuf const* data) {
+  uint64_t const uncompressedLength = data->computeChainDataLength();
+  resetStream(uncompressedLength);
+  uint64_t const maxCompressedLen = maxCompressedLength(uncompressedLength);
+
+  auto constexpr kMaxSingleStepLength = uint64_t(64) << 20; // 64 MB
+  auto constexpr kDefaultBufferLength = uint64_t(4) << 20; // 4 MB
+
+  MutableByteRange output;
+  auto buffer = addOutputBuffer(
+      output,
+      maxCompressedLen <= kMaxSingleStepLength ? maxCompressedLen
+                                               : kDefaultBufferLength);
+
+  // Compress the entire IOBuf chain into the IOBuf chain pointed to by buffer
+  IOBuf const* current = data;
+  ByteRange input{current->data(), current->length()};
+  StreamCodec::FlushOp flushOp = StreamCodec::FlushOp::NONE;
+  for (;;) {
+    while (input.empty() && current->next() != data) {
+      current = current->next();
+      input = {current->data(), current->length()};
+    }
+    if (current->next() == data) {
+      // This is the last input buffer so end the stream
+      flushOp = StreamCodec::FlushOp::END;
+    }
+    if (output.empty()) {
+      buffer->prependChain(addOutputBuffer(output, kDefaultBufferLength));
+    }
+    bool const done = compressStream(input, output, flushOp);
+    if (done) {
+      DCHECK(input.empty());
+      DCHECK(flushOp == StreamCodec::FlushOp::END);
+      DCHECK_EQ(current->next(), data);
+      break;
+    }
+  }
+  buffer->prev()->trimEnd(output.size());
+  return buffer;
+}
+
+static uint64_t computeBufferLength(
+    uint64_t const compressedLength,
+    uint64_t const blockSize) {
+  uint64_t constexpr kMaxBufferLength = uint64_t(4) << 20; // 4 MiB
+  uint64_t const goodBufferSize = 4 * std::max(blockSize, compressedLength);
+  return std::min(goodBufferSize, kMaxBufferLength);
+}
+
+std::unique_ptr<IOBuf> StreamCodec::doUncompress(
+    IOBuf const* data,
+    Optional<uint64_t> uncompressedLength) {
+  auto constexpr kMaxSingleStepLength = uint64_t(64) << 20; // 64 MB
+  auto constexpr kBlockSize = uint64_t(128) << 10;
+  auto const defaultBufferLength =
+      computeBufferLength(data->computeChainDataLength(), kBlockSize);
+
+  uncompressedLength = getUncompressedLength(data, uncompressedLength);
+  resetStream(uncompressedLength);
+
+  MutableByteRange output;
+  auto buffer = addOutputBuffer(
+      output,
+      (uncompressedLength && *uncompressedLength <= kMaxSingleStepLength
+           ? *uncompressedLength
+           : defaultBufferLength));
+
+  // Uncompress the entire IOBuf chain into the IOBuf chain pointed to by buffer
+  IOBuf const* current = data;
+  ByteRange input{current->data(), current->length()};
+  StreamCodec::FlushOp flushOp = StreamCodec::FlushOp::NONE;
+  for (;;) {
+    while (input.empty() && current->next() != data) {
+      current = current->next();
+      input = {current->data(), current->length()};
+    }
+    if (current->next() == data) {
+      // Tell the uncompressor there is no more input (it may optimize)
+      flushOp = StreamCodec::FlushOp::END;
+    }
+    if (output.empty()) {
+      buffer->prependChain(addOutputBuffer(output, defaultBufferLength));
+    }
+    bool const done = uncompressStream(input, output, flushOp);
+    if (done) {
+      break;
+    }
+  }
+  if (!input.empty()) {
+    throw std::runtime_error("Codec: Junk after end of data");
+  }
+
+  buffer->prev()->trimEnd(output.size());
+  if (uncompressedLength &&
+      *uncompressedLength != buffer->computeChainDataLength()) {
+    throw std::runtime_error("Codec: invalid uncompressed length");
+  }
+
+  return buffer;
+}
+
 namespace {
 
 /**
@@ -187,6 +423,7 @@ class NoCompressionCodec final : public Codec {
   explicit NoCompressionCodec(int level, CodecType type);
 
  private:
+  uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override;
   std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
   std::unique_ptr<IOBuf> doUncompress(
       const IOBuf* data,
@@ -212,6 +449,11 @@ NoCompressionCodec::NoCompressionCodec(int level, CodecType type)
   }
 }
 
+uint64_t NoCompressionCodec::doMaxCompressedLength(
+    uint64_t uncompressedLength) const {
+  return uncompressedLength;
+}
+
 std::unique_ptr<IOBuf> NoCompressionCodec::doCompress(
     const IOBuf* data) {
   return data->clone();
@@ -288,14 +530,6 @@ prefixToStringLE(T prefix, uint64_t n = sizeof(T)) {
   memcpy(&result[0], &prefix, n);
   return result;
 }
-
-static uint64_t computeBufferLength(
-    uint64_t const compressedLength,
-    uint64_t const blockSize) {
-  uint64_t constexpr kMaxBufferLength = uint64_t(4) << 20; // 4 MiB
-  uint64_t const goodBufferSize = 4 * std::max(blockSize, compressedLength);
-  return std::min(goodBufferSize, kMaxBufferLength);
-}
 } // namespace
 
 #if FOLLY_HAVE_LIBLZ4
@@ -311,6 +545,7 @@ class LZ4Codec final : public Codec {
  private:
   bool doNeedsUncompressedLength() const override;
   uint64_t doMaxUncompressedLength() const override;
+  uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override;
 
   bool encodeSize() const { return type() == CodecType::LZ4_VARINT_SIZE; }
 
@@ -360,6 +595,11 @@ uint64_t LZ4Codec::doMaxUncompressedLength() const {
   return LZ4_MAX_INPUT_SIZE;
 }
 
+uint64_t LZ4Codec::doMaxCompressedLength(uint64_t uncompressedLength) const {
+  return LZ4_compressBound(uncompressedLength) +
+      (encodeSize() ? kMaxVarintLength64 : 0);
+}
+
 std::unique_ptr<IOBuf> LZ4Codec::doCompress(const IOBuf* data) {
   IOBuf clone;
   if (data->isChained()) {
@@ -368,8 +608,7 @@ std::unique_ptr<IOBuf> LZ4Codec::doCompress(const IOBuf* data) {
     data = &clone;
   }
 
-  uint32_t extraSize = encodeSize() ? kMaxVarintLength64 : 0;
-  auto out = IOBuf::create(extraSize + LZ4_compressBound(data->length()));
+  auto out = IOBuf::create(maxCompressedLength(data->length()));
   if (encodeSize()) {
     encodeVarintToIOBuf(data->length(), out.get());
   }
@@ -452,6 +691,8 @@ class LZ4FrameCodec final : public Codec {
       const override;
 
  private:
+  uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override;
+
   std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
   std::unique_ptr<IOBuf> doUncompress(
       const IOBuf* data,
@@ -481,6 +722,14 @@ bool LZ4FrameCodec::canUncompress(const IOBuf* data, Optional<uint64_t>) const {
   return dataStartsWithLE(data, kLZ4FrameMagicLE);
 }
 
+uint64_t LZ4FrameCodec::doMaxCompressedLength(
+    uint64_t uncompressedLength) const {
+  LZ4F_preferences_t prefs{};
+  prefs.compressionLevel = level_;
+  prefs.frameInfo.contentSize = uncompressedLength;
+  return LZ4F_compressFrameBound(uncompressedLength, &prefs);
+}
+
 static size_t lz4FrameThrowOnError(size_t code) {
   if (LZ4F_isError(code)) {
     throw std::runtime_error(
@@ -535,7 +784,7 @@ std::unique_ptr<IOBuf> LZ4FrameCodec::doCompress(const IOBuf* data) {
   prefs.compressionLevel = level_;
   prefs.frameInfo.contentSize = uncompressedLength;
   // Compress
-  auto buf = IOBuf::create(LZ4F_compressFrameBound(uncompressedLength, &prefs));
+  auto buf = IOBuf::create(maxCompressedLength(uncompressedLength));
   const size_t written = lz4FrameThrowOnError(LZ4F_compressFrame(
       buf->writableTail(),
       buf->tailroom(),
@@ -659,6 +908,7 @@ class SnappyCodec final : public Codec {
 
  private:
   uint64_t doMaxUncompressedLength() const override;
+  uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override;
   std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
   std::unique_ptr<IOBuf> doUncompress(
       const IOBuf* data,
@@ -688,10 +938,13 @@ uint64_t SnappyCodec::doMaxUncompressedLength() const {
   return std::numeric_limits<uint32_t>::max();
 }
 
+uint64_t SnappyCodec::doMaxCompressedLength(uint64_t uncompressedLength) const {
+  return snappy::MaxCompressedLength(uncompressedLength);
+}
+
 std::unique_ptr<IOBuf> SnappyCodec::doCompress(const IOBuf* data) {
   IOBufSnappySource source(data);
-  auto out =
-    IOBuf::create(snappy::MaxCompressedLength(source.Available()));
+  auto out = IOBuf::create(maxCompressedLength(source.Available()));
 
   snappy::UncheckedByteArraySink sink(reinterpret_cast<char*>(
       out->writableTail()));
@@ -748,6 +1001,7 @@ class ZlibCodec final : public Codec {
       const override;
 
  private:
+  uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override;
   std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
   std::unique_ptr<IOBuf> doUncompress(
       const IOBuf* data,
@@ -819,6 +1073,10 @@ bool ZlibCodec::canUncompress(const IOBuf* data, Optional<uint64_t>) const {
   }
 }
 
+uint64_t ZlibCodec::doMaxCompressedLength(uint64_t uncompressedLength) const {
+  return deflateBound(nullptr, uncompressedLength);
+}
+
 std::unique_ptr<Codec> ZlibCodec::create(int level, CodecType type) {
   return std::make_unique<ZlibCodec>(level, type);
 }
@@ -1075,6 +1333,7 @@ class LZMA2Codec final : public Codec {
  private:
   bool doNeedsUncompressedLength() const override;
   uint64_t doMaxUncompressedLength() const override;
+  uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override;
 
   bool encodeSize() const { return type() == CodecType::LZMA2_VARINT_SIZE; }
 
@@ -1141,6 +1400,11 @@ uint64_t LZMA2Codec::doMaxUncompressedLength() const {
   return uint64_t(1) << 63;
 }
 
+uint64_t LZMA2Codec::doMaxCompressedLength(uint64_t uncompressedLength) const {
+  return lzma_stream_buffer_bound(uncompressedLength) +
+      (encodeSize() ? kMaxVarintLength64 : 0);
+}
+
 std::unique_ptr<IOBuf> LZMA2Codec::addOutputBuffer(
     lzma_stream* stream,
     size_t length) {
@@ -1334,6 +1598,7 @@ class ZSTDCodec final : public Codec {
 
  private:
   bool doNeedsUncompressedLength() const override;
+  uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override;
   std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
   std::unique_ptr<IOBuf> doUncompress(
       const IOBuf* data,
@@ -1380,6 +1645,10 @@ bool ZSTDCodec::doNeedsUncompressedLength() const {
   return false;
 }
 
+uint64_t ZSTDCodec::doMaxCompressedLength(uint64_t uncompressedLength) const {
+  return ZSTD_compressBound(uncompressedLength);
+}
+
 void zstdThrowIfError(size_t rc) {
   if (!ZSTD_isError(rc)) {
     return;
@@ -1414,7 +1683,8 @@ std::unique_ptr<IOBuf> ZSTDCodec::doCompress(const IOBuf* data) {
   zstdThrowIfError(rc);
 
   Cursor cursor(data);
-  auto result = IOBuf::createCombined(ZSTD_compressBound(cursor.totalLength()));
+  auto result =
+      IOBuf::createCombined(maxCompressedLength(cursor.totalLength()));
 
   ZSTD_outBuffer out;
   out.dst = result->writableTail();
@@ -1557,6 +1827,7 @@ class Bzip2Codec final : public Codec {
       const override;
 
  private:
+  uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override;
   std::unique_ptr<IOBuf> doCompress(IOBuf const* data) override;
   std::unique_ptr<IOBuf> doUncompress(
       IOBuf const* data,
@@ -1602,6 +1873,14 @@ bool Bzip2Codec::canUncompress(IOBuf const* data, Optional<uint64_t>) const {
   return dataStartsWithLE(data, kBzip2MagicLE, kBzip2MagicBytes);
 }
 
+uint64_t Bzip2Codec::doMaxCompressedLength(uint64_t uncompressedLength) const {
+  // http://www.bzip.org/1.0.5/bzip2-manual-1.0.5.html#bzbufftobuffcompress
+  //   To guarantee that the compressed data will fit in its buffer, allocate an
+  //   output buffer of size 1% larger than the uncompressed data, plus six
+  //   hundred extra bytes.
+  return uncompressedLength + uncompressedLength / 100 + 600;
+}
+
 static bz_stream createBzStream() {
   bz_stream stream;
   stream.bzalloc = nullptr;
@@ -1626,14 +1905,6 @@ static int bzCheck(int const rc) {
   }
 }
 
-static uint64_t bzCompressBound(uint64_t const uncompressedLength) {
-  // http://www.bzip.org/1.0.5/bzip2-manual-1.0.5.html#bzbufftobuffcompress
-  //   To guarantee that the compressed data will fit in its buffer, allocate an
-  //   output buffer of size 1% larger than the uncompressed data, plus six
-  //   hundred extra bytes.
-  return uncompressedLength + uncompressedLength / 100 + 600;
-}
-
 static std::unique_ptr<IOBuf> addOutputBuffer(
     bz_stream* stream,
     uint64_t const bufferLength) {
@@ -1657,14 +1928,14 @@ std::unique_ptr<IOBuf> Bzip2Codec::doCompress(IOBuf const* data) {
   };
 
   uint64_t const uncompressedLength = data->computeChainDataLength();
-  uint64_t const maxCompressedLength = bzCompressBound(uncompressedLength);
+  uint64_t const maxCompressedLen = maxCompressedLength(uncompressedLength);
   uint64_t constexpr kMaxSingleStepLength = uint64_t(64) << 20; // 64 MiB
   uint64_t constexpr kDefaultBufferLength = uint64_t(4) << 20;
 
   auto out = addOutputBuffer(
       &stream,
-      maxCompressedLength <= kMaxSingleStepLength ? maxCompressedLength
-                                                  : kDefaultBufferLength);
+      maxCompressedLen <= kMaxSingleStepLength ? maxCompressedLen
+                                               : kDefaultBufferLength);
 
   for (auto range : *data) {
     while (!range.empty()) {
@@ -1766,6 +2037,10 @@ class AutomaticCodec final : public Codec {
   bool doNeedsUncompressedLength() const override;
   uint64_t doMaxUncompressedLength() const override;
 
+  uint64_t doMaxCompressedLength(uint64_t) const override {
+    throw std::runtime_error(
+        "AutomaticCodec error: maxCompressedLength() not supported.");
+  }
   std::unique_ptr<IOBuf> doCompress(const IOBuf*) override {
     throw std::runtime_error("AutomaticCodec error: compress() not supported.");
   }
@@ -1909,93 +2184,112 @@ std::unique_ptr<IOBuf> AutomaticCodec::doUncompress(
   throw std::runtime_error("AutomaticCodec error: Unknown compressed data");
 }
 
-}  // namespace
+using CodecFactory = std::unique_ptr<Codec> (*)(int, CodecType);
+using StreamCodecFactory = std::unique_ptr<StreamCodec> (*)(int, CodecType);
+struct Factory {
+  CodecFactory codec;
+  StreamCodecFactory stream;
+};
 
-typedef std::unique_ptr<Codec> (*CodecFactory)(int, CodecType);
-static constexpr CodecFactory
+constexpr Factory
     codecFactories[static_cast<size_t>(CodecType::NUM_CODEC_TYPES)] = {
-        nullptr, // USER_DEFINED
-        NoCompressionCodec::create,
+        {}, // USER_DEFINED
+        {NoCompressionCodec::create, nullptr},
 
 #if FOLLY_HAVE_LIBLZ4
-        LZ4Codec::create,
+        {LZ4Codec::create, nullptr},
 #else
-        nullptr,
+        {},
 #endif
 
 #if FOLLY_HAVE_LIBSNAPPY
-        SnappyCodec::create,
+        {SnappyCodec::create, nullptr},
 #else
-        nullptr,
+        {},
 #endif
 
 #if FOLLY_HAVE_LIBZ
-        ZlibCodec::create,
+        {ZlibCodec::create, nullptr},
 #else
-        nullptr,
+        {},
 #endif
 
 #if FOLLY_HAVE_LIBLZ4
-        LZ4Codec::create,
+        {LZ4Codec::create, nullptr},
 #else
-        nullptr,
+        {},
 #endif
 
 #if FOLLY_HAVE_LIBLZMA
-        LZMA2Codec::create,
-        LZMA2Codec::create,
+        {LZMA2Codec::create, nullptr},
+        {LZMA2Codec::create, nullptr},
 #else
-        nullptr,
-        nullptr,
+        {},
+        {},
 #endif
 
 #if FOLLY_HAVE_LIBZSTD
-        ZSTDCodec::create,
+        {ZSTDCodec::create, nullptr},
 #else
-        nullptr,
+        {},
 #endif
 
 #if FOLLY_HAVE_LIBZ
-        ZlibCodec::create,
+        {ZlibCodec::create, nullptr},
 #else
-        nullptr,
+        {},
 #endif
 
 #if (FOLLY_HAVE_LIBLZ4 && LZ4_VERSION_NUMBER >= 10301)
-        LZ4FrameCodec::create,
+        {LZ4FrameCodec::create, nullptr},
 #else
-        nullptr,
+        {},
 #endif
 
 #if FOLLY_HAVE_LIBBZ2
-        Bzip2Codec::create,
+        {Bzip2Codec::create, nullptr},
 #else
-        nullptr
+        {},
 #endif
 };
 
-bool hasCodec(CodecType type) {
-  size_t idx = static_cast<size_t>(type);
+Factory const& getFactory(CodecType type) {
+  size_t const idx = static_cast<size_t>(type);
   if (idx >= static_cast<size_t>(CodecType::NUM_CODEC_TYPES)) {
     throw std::invalid_argument(
         to<std::string>("Compression type ", idx, " invalid"));
   }
-  return codecFactories[idx] != nullptr;
+  return codecFactories[idx];
+}
+} // namespace
+
+bool hasCodec(CodecType type) {
+  return getFactory(type).codec != nullptr;
 }
 
 std::unique_ptr<Codec> getCodec(CodecType type, int level) {
-  size_t idx = static_cast<size_t>(type);
-  if (idx >= static_cast<size_t>(CodecType::NUM_CODEC_TYPES)) {
+  auto const factory = getFactory(type).codec;
+  if (!factory) {
     throw std::invalid_argument(
-        to<std::string>("Compression type ", idx, " invalid"));
+        to<std::string>("Compression type ", type, " not supported"));
   }
-  auto factory = codecFactories[idx];
+  auto codec = (*factory)(level, type);
+  DCHECK(codec->type() == type);
+  return codec;
+}
+
+bool hasStreamCodec(CodecType type) {
+  return getFactory(type).stream != nullptr;
+}
+
+std::unique_ptr<StreamCodec> getStreamCodec(CodecType type, int level) {
+  auto const factory = getFactory(type).stream;
   if (!factory) {
-    throw std::invalid_argument(to<std::string>(
-        "Compression type ", idx, " not supported"));
+    throw std::invalid_argument(
+        to<std::string>("Compression type ", type, " not supported"));
   }
   auto codec = (*factory)(level, type);
-  DCHECK_EQ(static_cast<size_t>(codec->type()), idx);
+  DCHECK(codec->type() == type);
   return codec;
 }
 
index a5ac58c2e2c346f9cf6d024d49c9653080e9c5ae..e9bcd2a5a38defaf49b7fe2ffafa77d183203552 100644 (file)
@@ -107,11 +107,12 @@ class Codec {
  public:
   virtual ~Codec() { }
 
+  static constexpr uint64_t UNLIMITED_UNCOMPRESSED_LENGTH = uint64_t(-1);
   /**
    * Return the maximum length of data that may be compressed with this codec.
    * NO_COMPRESSION and ZLIB support arbitrary lengths;
    * LZ4 supports up to 1.9GiB; SNAPPY supports up to 4GiB.
-   * May return UNLIMITED_UNCOMPRESSED_LENGTH (uint64_t(-1)) if unlimited.
+   * May return UNLIMITED_UNCOMPRESSED_LENGTH if unlimited.
    */
   uint64_t maxUncompressedLength() const;
 
@@ -154,8 +155,6 @@ class Codec {
    * Regardless of the behavior of the underlying compressor, uncompressing
    * an empty IOBuf chain will return an empty IOBuf chain.
    */
-  static constexpr uint64_t UNLIMITED_UNCOMPRESSED_LENGTH = uint64_t(-1);
-
   std::unique_ptr<IOBuf> uncompress(
       const IOBuf* data,
       folly::Optional<uint64_t> uncompressedLength = folly::none);
@@ -169,6 +168,24 @@ class Codec {
       StringPiece data,
       folly::Optional<uint64_t> uncompressedLength = folly::none);
 
+  /**
+   * Returns a bound on the maximum compressed length when compressing data with
+   * the given uncompressed length.
+   */
+  uint64_t maxCompressedLength(uint64_t uncompressedLength) const;
+
+  /**
+   * Extracts the uncompressed length from the compressed data if possible.
+   * If the codec doesn't store the uncompressed length, or the data is
+   * corrupted it returns the given uncompressedLength.
+   * If the uncompressed length is stored in the compressed data and
+   * uncompressedLength is not none and they do not match a std::runtime_error
+   * is thrown.
+   */
+  folly::Optional<uint64_t> getUncompressedLength(
+      const folly::IOBuf* data,
+      folly::Optional<uint64_t> uncompressedLength = folly::none) const;
+
  protected:
   explicit Codec(CodecType type);
 
@@ -209,9 +226,171 @@ class Codec {
       StringPiece data,
       folly::Optional<uint64_t> uncompressedLength);
 
+  virtual uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const = 0;
+  // default: returns the passed uncompressedLength.
+  virtual folly::Optional<uint64_t> doGetUncompressedLength(
+      const folly::IOBuf* data,
+      folly::Optional<uint64_t> uncompressedLength) const;
+
   CodecType type_;
 };
 
+class StreamCodec : public Codec {
+ public:
+  virtual ~StreamCodec() {}
+
+  /**
+   * Does the codec need the data length before compression streaming?
+   */
+  bool needsDataLength() const;
+
+  /*****************************************************************************
+   * Streaming API
+   *****************************************************************************
+   * A low-level stateful streaming API.
+   * Streaming operations can be started in two ways:
+   *   1. From a clean Codec on which no non-const methods have been called.
+   *   2. A call to resetStream(), which will reset any codec to a clean state.
+   * After a streaming operation has begun, either compressStream() or
+   * uncompressStream() must be called until the streaming operation ends.
+   * compressStream() ends when it returns true with flushOp END.
+   * uncompressStream() ends when it returns true. At this point the codec
+   * may be reused by calling resetStream().
+   *
+   * compress() and uncompress() can be called at any time, but they interrupt
+   * any ongoing streaming operations (state is lost and resetStream() must be
+   * called before another streaming operation).
+   */
+
+  /**
+   * Reset the state of the codec, and set the uncompressed length for the next
+   * streaming operation. If uncompressedLength is not none it must be exactly
+   * the uncompressed length. compressStream() must be passed exactly
+   * uncompressedLength input bytes before the stream is ended.
+   * uncompressStream() must be passed a compressed frame that uncompresses to
+   * uncompressedLength.
+   */
+  void resetStream(folly::Optional<uint64_t> uncompressedLength = folly::none);
+
+  enum class FlushOp { NONE, FLUSH, END };
+
+  /**
+   * Compresses some data from the input buffer and writes the compressed data
+   * into the output buffer. It may read input without producing any output,
+   * except when forced to flush.
+   *
+   * The input buffer is advanced to point to the range of data that hasn't yet
+   * been read. Compression will resume at this point for the next call to
+   * compressStream(). The output buffer is advanced one byte past the last byte
+   * written.
+   *
+   * The default flushOp is NONE, which allows compressStream() complete
+   * discretion in how much data to gather before writing any output.
+   *
+   * If flushOp is END, all pending and input data is flushed to the output
+   * buffer, and the frame is ended. compressStream() must be called with the
+   * same input and flushOp END until it returns true. At this point the caller
+   * must call resetStream() to use the codec again.
+   *
+   * If flushOp is FLUSH, all pending and input data is flushed to the output
+   * buffer, but the frame is not ended. compressStream() must be called with
+   * the same input and flushOp END until it returns true. At this point the
+   * caller can continue to compressStream() with any input data and flushOp.
+   * The uncompressor, if passed all the produced output data, will be able to
+   * uncompress all the input data passed to compressStream() so far. Excessive
+   * use of flushOp FLUSH will deteriorate compression ratio. This is useful for
+   * stateful streaming across a network. Most users don't need to use this
+   * flushOp.
+   *
+   * A std::logic_error is thrown on incorrect usage of the API.
+   * A std::runtime_error is thrown upon error conditions.
+   */
+  bool compressStream(
+      folly::ByteRange& input,
+      folly::MutableByteRange& output,
+      FlushOp flushOp = StreamCodec::FlushOp::NONE);
+
+  /**
+   * Uncompresses some data from the input buffer and writes the uncompressed
+   * data into the output buffer. It may read input without producing any
+   * output.
+   *
+   * The input buffer is advanced to point to the range of data that hasn't yet
+   * been read. Uncompression will resume at this point for the next call to
+   * uncompressStream(). The output buffer is advanced one byte past the last
+   * byte written.
+   *
+   * The default flushOp is NONE, which allows uncompressStream() complete
+   * discretion in how much output data to flush. The uncompressor may not make
+   * maximum forward progress, but will make some forward progress when
+   * possible.
+   *
+   * If flushOp is END, the caller guarantees that no more input will be
+   * presented to uncompressStream(). uncompressStream() must be called with the
+   * same input and flushOp END until it returns true. This is not mandatory,
+   * but if the input is all available in one buffer, and there is enough output
+   * space to write the entire frame, codecs can uncompress faster.
+   *
+   * If flushOp is FLUSH, uncompressStream() is guaranteed to make the maximum
+   * amount of forward progress possible. When using this flushOp and
+   * uncompressStream() returns with `!output.empty()` the caller knows that all
+   * pending output has been flushed. This is useful for stateful streaming
+   * across a network, and it should be used in conjunction with
+   * compressStream() with flushOp FLUSH. Most users don't need to use this
+   * flushOp.
+   *
+   * Returns true at the end of a frame. At this point resetStream() must be
+   * called to reuse the codec.
+   */
+  bool uncompressStream(
+      folly::ByteRange& input,
+      folly::MutableByteRange& output,
+      FlushOp flushOp = StreamCodec::FlushOp::NONE);
+
+ protected:
+  explicit StreamCodec(CodecType type) : Codec(type) {}
+
+  // Returns the uncompressed length last passed to resetStream() or none if it
+  // hasn't been called yet.
+  folly::Optional<uint64_t> uncompressedLength() const {
+    return uncompressedLength_;
+  }
+
+ private:
+  // default: Implemented using the streaming API.
+  std::unique_ptr<IOBuf> doCompress(const folly::IOBuf* data) override;
+  virtual std::unique_ptr<IOBuf> doUncompress(
+      const folly::IOBuf* data,
+      folly::Optional<uint64_t> uncompressedLength) override;
+
+  // default: Returns false
+  virtual bool doNeedsDataLength() const;
+  virtual void doResetStream() = 0;
+  virtual bool doCompressStream(
+      folly::ByteRange& input,
+      folly::MutableByteRange& output,
+      FlushOp flushOp) = 0;
+  virtual bool doUncompressStream(
+      folly::ByteRange& input,
+      folly::MutableByteRange& output,
+      FlushOp flushOp) = 0;
+
+  enum class State {
+    RESET,
+    COMPRESS,
+    COMPRESS_FLUSH,
+    COMPRESS_END,
+    UNCOMPRESS,
+    END,
+  };
+  void assertStateIs(State expected) const;
+
+  CodecType type_;
+  State state_{State::RESET};
+  ByteRange previousInput_{};
+  folly::Optional<uint64_t> uncompressedLength_{};
+};
+
 constexpr int COMPRESSION_LEVEL_FASTEST = -1;
 constexpr int COMPRESSION_LEVEL_DEFAULT = -2;
 constexpr int COMPRESSION_LEVEL_BEST = -3;
@@ -232,8 +411,29 @@ constexpr int COMPRESSION_LEVEL_BEST = -3;
  * decompress all data compressed with the a codec of the same type, regardless
  * of compression level.
  */
-std::unique_ptr<Codec> getCodec(CodecType type,
-                                int level = COMPRESSION_LEVEL_DEFAULT);
+std::unique_ptr<Codec> getCodec(
+    CodecType type,
+    int level = COMPRESSION_LEVEL_DEFAULT);
+
+/**
+ * Return a codec for the given type. Throws on error.  The level
+ * is a non-negative codec-dependent integer indicating the level of
+ * compression desired, or one of the following constants:
+ *
+ * COMPRESSION_LEVEL_FASTEST is fastest (uses least CPU / memory,
+ *   worst compression)
+ * COMPRESSION_LEVEL_DEFAULT is the default (likely a tradeoff between
+ *   FASTEST and BEST)
+ * COMPRESSION_LEVEL_BEST is the best compression (uses most CPU / memory,
+ *   best compression)
+ *
+ * When decompressing, the compression level is ignored. All codecs will
+ * decompress all data compressed with the a codec of the same type, regardless
+ * of compression level.
+ */
+std::unique_ptr<StreamCodec> getStreamCodec(
+    CodecType type,
+    int level = COMPRESSION_LEVEL_DEFAULT);
 
 /**
  * Returns a codec that can uncompress any of the given codec types as well as
@@ -262,4 +462,8 @@ std::unique_ptr<Codec> getAutoUncompressionCodec(
  */
 bool hasCodec(CodecType type);
 
-}}  // namespaces
+/**
+ * Check if a specified codec is supported and supports streaming.
+ */
+bool hasStreamCodec(CodecType type);
+}} // namespaces
index e8b9312eac76f26c330ad3c4ac4e316003f18294..d1d3a177e4ce1dae0f910294a624153ffd5b1e39 100644 (file)
 
 #include <folly/io/Compression.h>
 
+#include <algorithm>
 #include <random>
 #include <set>
 #include <thread>
 #include <unordered_map>
+#include <utility>
 
 #include <boost/noncopyable.hpp>
 #include <glog/logging.h>
@@ -147,6 +149,19 @@ static std::vector<CodecType> availableCodecs() {
   return codecs;
 }
 
+static std::vector<CodecType> availableStreamCodecs() {
+  std::vector<CodecType> codecs;
+
+  for (size_t i = 0; i < static_cast<size_t>(CodecType::NUM_CODEC_TYPES); ++i) {
+    auto type = static_cast<CodecType>(i);
+    if (hasStreamCodec(type)) {
+      codecs.push_back(type);
+    }
+  }
+
+  return codecs;
+}
+
 TEST(CompressionTestNeedsUncompressedLength, Simple) {
   static const struct { CodecType type; bool needsUncompressedLength; }
     expectations[] = {
@@ -399,6 +414,422 @@ INSTANTIATE_TEST_CASE_P(
             CodecType::BZIP2,
         })));
 
+class StreamingUnitTest : public testing::TestWithParam<CodecType> {
+ protected:
+  void SetUp() override {
+    codec_ = getStreamCodec(GetParam());
+  }
+
+  std::unique_ptr<StreamCodec> codec_;
+};
+
+TEST_P(StreamingUnitTest, maxCompressedLength) {
+  EXPECT_EQ(0, codec_->maxCompressedLength(0));
+  for (uint64_t const length : {1, 10, 100, 1000, 10000, 100000, 1000000}) {
+    EXPECT_GE(codec_->maxCompressedLength(length), length);
+  }
+}
+
+TEST_P(StreamingUnitTest, getUncompressedLength) {
+  auto const empty = IOBuf::create(0);
+  EXPECT_EQ(uint64_t(0), codec_->getUncompressedLength(empty.get()));
+  EXPECT_EQ(uint64_t(0), codec_->getUncompressedLength(empty.get(), 0));
+
+  auto const data = IOBuf::wrapBuffer(randomDataHolder.data(100));
+  auto const compressed = codec_->compress(data.get());
+
+  EXPECT_ANY_THROW(codec_->getUncompressedLength(data.get(), 0));
+  if (auto const length = codec_->getUncompressedLength(data.get())) {
+    EXPECT_EQ(100, *length);
+  }
+  EXPECT_EQ(uint64_t(100), codec_->getUncompressedLength(data.get(), 100));
+  // If the uncompressed length is stored in the frame, then make sure it throws
+  // when it is given the wrong length.
+  if (codec_->getUncompressedLength(data.get()) == uint64_t(100)) {
+    EXPECT_ANY_THROW(codec_->getUncompressedLength(data.get(), 200));
+  }
+}
+
+TEST_P(StreamingUnitTest, emptyData) {
+  ByteRange input{};
+  auto buffer = IOBuf::create(1);
+  buffer->append(buffer->capacity());
+  MutableByteRange output{};
+
+  // Test compressing empty data in one pass
+  EXPECT_TRUE(codec_->compressStream(input, output, StreamCodec::FlushOp::END));
+  codec_->resetStream(0);
+  EXPECT_TRUE(codec_->compressStream(input, output, StreamCodec::FlushOp::END));
+  codec_->resetStream();
+  output = {buffer->writableData(), buffer->length()};
+  EXPECT_TRUE(codec_->compressStream(input, output, StreamCodec::FlushOp::END));
+  EXPECT_EQ(buffer->length(), output.size());
+
+  // Test compressing empty data with multiple calls to compressStream()
+  codec_->resetStream();
+  output = {};
+  EXPECT_FALSE(codec_->compressStream(input, output));
+  EXPECT_TRUE(
+      codec_->compressStream(input, output, StreamCodec::FlushOp::FLUSH));
+  EXPECT_TRUE(codec_->compressStream(input, output, StreamCodec::FlushOp::END));
+  codec_->resetStream();
+  output = {buffer->writableData(), buffer->length()};
+  EXPECT_FALSE(codec_->compressStream(input, output));
+  EXPECT_TRUE(
+      codec_->compressStream(input, output, StreamCodec::FlushOp::FLUSH));
+  EXPECT_TRUE(codec_->compressStream(input, output, StreamCodec::FlushOp::END));
+  EXPECT_EQ(buffer->length(), output.size());
+
+  // Test uncompressing empty data
+  output = {};
+  codec_->resetStream();
+  EXPECT_TRUE(codec_->uncompressStream(input, output));
+  codec_->resetStream();
+  EXPECT_TRUE(
+      codec_->uncompressStream(input, output, StreamCodec::FlushOp::FLUSH));
+  codec_->resetStream();
+  EXPECT_TRUE(
+      codec_->uncompressStream(input, output, StreamCodec::FlushOp::END));
+  codec_->resetStream(0);
+  EXPECT_TRUE(codec_->uncompressStream(input, output));
+  codec_->resetStream(0);
+  EXPECT_TRUE(
+      codec_->uncompressStream(input, output, StreamCodec::FlushOp::FLUSH));
+  codec_->resetStream(0);
+  EXPECT_TRUE(
+      codec_->uncompressStream(input, output, StreamCodec::FlushOp::END));
+}
+
+TEST_P(StreamingUnitTest, noForwardProgressOkay) {
+  auto inBuffer = IOBuf::create(2);
+  inBuffer->writableData()[0] = 'a';
+  inBuffer->writableData()[0] = 'a';
+  inBuffer->append(2);
+  auto input = inBuffer->coalesce();
+  auto compressed = codec_->compress(inBuffer.get());
+
+  auto outBuffer = IOBuf::create(codec_->maxCompressedLength(2));
+  MutableByteRange output{outBuffer->writableTail(), outBuffer->tailroom()};
+
+  ByteRange emptyInput;
+  MutableByteRange emptyOutput;
+
+  // Compress some data to avoid empty data special casing
+  codec_->resetStream();
+  while (!input.empty()) {
+    codec_->compressStream(input, output);
+  }
+  // empty input and output is okay for flush NONE and FLUSH.
+  codec_->compressStream(emptyInput, emptyOutput);
+  codec_->compressStream(emptyInput, emptyOutput, StreamCodec::FlushOp::FLUSH);
+
+  codec_->resetStream();
+  input = inBuffer->coalesce();
+  output = {outBuffer->writableTail(), outBuffer->tailroom()};
+  while (!input.empty()) {
+    codec_->compressStream(input, output);
+  }
+  // empty input and output is okay for flush END.
+  codec_->compressStream(emptyInput, emptyOutput, StreamCodec::FlushOp::END);
+
+  codec_->resetStream();
+  input = compressed->coalesce();
+  input.uncheckedSubtract(1); // Remove last byte so the operation is incomplete
+  output = {inBuffer->writableData(), inBuffer->length()};
+  // Uncompress some data to avoid empty data special casing
+  while (!input.empty()) {
+    EXPECT_FALSE(codec_->uncompressStream(input, output));
+  }
+  // empty input and output is okay for all flush values.
+  EXPECT_FALSE(codec_->uncompressStream(emptyInput, emptyOutput));
+  EXPECT_FALSE(codec_->uncompressStream(
+      emptyInput, emptyOutput, StreamCodec::FlushOp::FLUSH));
+  EXPECT_FALSE(codec_->uncompressStream(
+      emptyInput, emptyOutput, StreamCodec::FlushOp::END));
+}
+
+TEST_P(StreamingUnitTest, stateTransitions) {
+  auto inBuffer = IOBuf::create(1);
+  inBuffer->writableData()[0] = 'a';
+  inBuffer->append(1);
+  auto compressed = codec_->compress(inBuffer.get());
+  ByteRange const in = compressed->coalesce();
+  auto outBuffer = IOBuf::create(codec_->maxCompressedLength(in.size()));
+  MutableByteRange const out{outBuffer->writableTail(), outBuffer->tailroom()};
+
+  auto compress = [&](
+      StreamCodec::FlushOp flushOp = StreamCodec::FlushOp::NONE,
+      bool empty = false) {
+    auto input = in;
+    auto output = empty ? MutableByteRange{} : out;
+    return codec_->compressStream(input, output, flushOp);
+  };
+  auto uncompress = [&](
+      StreamCodec::FlushOp flushOp = StreamCodec::FlushOp::NONE,
+      bool empty = false) {
+    auto input = in;
+    auto output = empty ? MutableByteRange{} : out;
+    return codec_->uncompressStream(input, output, flushOp);
+  };
+
+  // compression flow
+  codec_->resetStream();
+  EXPECT_FALSE(compress());
+  EXPECT_FALSE(compress());
+  EXPECT_TRUE(compress(StreamCodec::FlushOp::FLUSH));
+  EXPECT_FALSE(compress());
+  EXPECT_TRUE(compress(StreamCodec::FlushOp::END));
+  // uncompression flow
+  codec_->resetStream();
+  EXPECT_FALSE(uncompress(StreamCodec::FlushOp::NONE, true));
+  codec_->resetStream();
+  EXPECT_FALSE(uncompress(StreamCodec::FlushOp::FLUSH, true));
+  codec_->resetStream();
+  EXPECT_FALSE(uncompress(StreamCodec::FlushOp::NONE, true));
+  codec_->resetStream();
+  EXPECT_FALSE(uncompress(StreamCodec::FlushOp::NONE, true));
+  codec_->resetStream();
+  EXPECT_TRUE(uncompress(StreamCodec::FlushOp::FLUSH));
+  // compress -> uncompress
+  codec_->resetStream();
+  EXPECT_FALSE(compress());
+  EXPECT_THROW(uncompress(), std::logic_error);
+  // uncompress -> compress
+  codec_->resetStream();
+  EXPECT_TRUE(uncompress(StreamCodec::FlushOp::FLUSH));
+  EXPECT_THROW(compress(), std::logic_error);
+  // end -> compress
+  codec_->resetStream();
+  EXPECT_FALSE(compress());
+  EXPECT_TRUE(compress(StreamCodec::FlushOp::END));
+  EXPECT_THROW(compress(), std::logic_error);
+  // end -> uncompress
+  codec_->resetStream();
+  EXPECT_TRUE(uncompress(StreamCodec::FlushOp::FLUSH));
+  EXPECT_THROW(uncompress(), std::logic_error);
+  // flush -> compress
+  codec_->resetStream();
+  EXPECT_FALSE(compress(StreamCodec::FlushOp::FLUSH, true));
+  EXPECT_THROW(compress(), std::logic_error);
+  // flush -> end
+  codec_->resetStream();
+  EXPECT_FALSE(compress(StreamCodec::FlushOp::FLUSH, true));
+  EXPECT_THROW(compress(StreamCodec::FlushOp::END), std::logic_error);
+  // undefined -> compress
+  codec_->compress(inBuffer.get());
+  EXPECT_THROW(compress(), std::logic_error);
+  codec_->uncompress(compressed.get());
+  EXPECT_THROW(compress(), std::logic_error);
+  // undefined -> undefined
+  codec_->uncompress(compressed.get());
+  codec_->compress(inBuffer.get());
+}
+
+INSTANTIATE_TEST_CASE_P(
+    StreamingUnitTest,
+    StreamingUnitTest,
+    testing::ValuesIn(availableStreamCodecs()));
+
+class StreamingCompressionTest
+    : public testing::TestWithParam<std::tuple<int, int, CodecType>> {
+ protected:
+  void SetUp() override {
+    auto const tup = GetParam();
+    uncompressedLength_ = uint64_t(1) << std::get<0>(tup);
+    chunkSize_ = size_t(1) << std::get<1>(tup);
+    codec_ = getStreamCodec(std::get<2>(tup));
+  }
+
+  void runResetStreamTest(DataHolder const& dh);
+  void runCompressStreamTest(DataHolder const& dh);
+  void runUncompressStreamTest(DataHolder const& dh);
+  void runFlushTest(DataHolder const& dh);
+
+ private:
+  std::vector<ByteRange> split(ByteRange data) const;
+
+  uint64_t uncompressedLength_;
+  size_t chunkSize_;
+  std::unique_ptr<StreamCodec> codec_;
+};
+
+std::vector<ByteRange> StreamingCompressionTest::split(ByteRange data) const {
+  size_t const pieces = std::max<size_t>(1, data.size() / chunkSize_);
+  std::vector<ByteRange> result;
+  result.reserve(pieces + 1);
+  while (!data.empty()) {
+    size_t const pieceSize = std::min(data.size(), chunkSize_);
+    result.push_back(data.subpiece(0, pieceSize));
+    data.uncheckedAdvance(pieceSize);
+  }
+  return result;
+}
+
+static std::unique_ptr<IOBuf> compressSome(
+    StreamCodec* codec,
+    ByteRange data,
+    uint64_t bufferSize,
+    StreamCodec::FlushOp flush) {
+  bool result;
+  IOBufQueue queue;
+  do {
+    auto buffer = IOBuf::create(bufferSize);
+    buffer->append(buffer->capacity());
+    MutableByteRange output{buffer->writableData(), buffer->length()};
+
+    result = codec->compressStream(data, output, flush);
+    buffer->trimEnd(output.size());
+    queue.append(std::move(buffer));
+
+  } while (!(flush == StreamCodec::FlushOp::NONE && data.empty()) && !result);
+  EXPECT_TRUE(data.empty());
+  return queue.move();
+}
+
+static std::pair<bool, std::unique_ptr<IOBuf>> uncompressSome(
+    StreamCodec* codec,
+    ByteRange& data,
+    uint64_t bufferSize,
+    StreamCodec::FlushOp flush) {
+  bool result;
+  IOBufQueue queue;
+  do {
+    auto buffer = IOBuf::create(bufferSize);
+    buffer->append(buffer->capacity());
+    MutableByteRange output{buffer->writableData(), buffer->length()};
+
+    result = codec->uncompressStream(data, output, flush);
+    buffer->trimEnd(output.size());
+    queue.append(std::move(buffer));
+
+  } while (queue.tailroom() == 0 && !result);
+  return std::make_pair(result, queue.move());
+}
+
+void StreamingCompressionTest::runResetStreamTest(DataHolder const& dh) {
+  auto const input = dh.data(uncompressedLength_);
+  // Compress some but leave state unclean
+  codec_->resetStream(uncompressedLength_);
+  compressSome(codec_.get(), input, chunkSize_, StreamCodec::FlushOp::NONE);
+  // Reset stream and compress all
+  codec_->resetStream();
+  auto compressed =
+      compressSome(codec_.get(), input, chunkSize_, StreamCodec::FlushOp::END);
+  auto const uncompressed = codec_->uncompress(compressed.get(), input.size());
+  EXPECT_EQ(dh.hash(uncompressedLength_), hashIOBuf(uncompressed.get()));
+}
+
+TEST_P(StreamingCompressionTest, resetStream) {
+  runResetStreamTest(constantDataHolder);
+  runResetStreamTest(randomDataHolder);
+}
+
+void StreamingCompressionTest::runCompressStreamTest(
+    const folly::io::test::DataHolder& dh) {
+  auto const inputs = split(dh.data(uncompressedLength_));
+
+  IOBufQueue queue;
+  codec_->resetStream(uncompressedLength_);
+  // Compress many inputs in a row
+  for (auto const input : inputs) {
+    queue.append(compressSome(
+        codec_.get(), input, chunkSize_, StreamCodec::FlushOp::NONE));
+  }
+  // Finish the operation with empty input.
+  ByteRange empty;
+  queue.append(
+      compressSome(codec_.get(), empty, chunkSize_, StreamCodec::FlushOp::END));
+
+  auto const uncompressed = codec_->uncompress(queue.front());
+  EXPECT_EQ(dh.hash(uncompressedLength_), hashIOBuf(uncompressed.get()));
+}
+
+TEST_P(StreamingCompressionTest, compressStream) {
+  runCompressStreamTest(constantDataHolder);
+  runCompressStreamTest(randomDataHolder);
+}
+
+void StreamingCompressionTest::runUncompressStreamTest(
+    const folly::io::test::DataHolder& dh) {
+  auto const data = IOBuf::wrapBuffer(dh.data(uncompressedLength_));
+  // Concatenate 3 compressed frames in a row
+  auto compressed = codec_->compress(data.get());
+  compressed->prependChain(codec_->compress(data.get()));
+  compressed->prependChain(codec_->compress(data.get()));
+  // Pass all 3 compressed frames in one input buffer
+  auto input = compressed->coalesce();
+  // Uncompress the first frame
+  codec_->resetStream(data->computeChainDataLength());
+  {
+    auto const result = uncompressSome(
+        codec_.get(), input, chunkSize_, StreamCodec::FlushOp::FLUSH);
+    ASSERT_TRUE(result.first);
+    ASSERT_EQ(hashIOBuf(data.get()), hashIOBuf(result.second.get()));
+  }
+  // Uncompress the second frame
+  codec_->resetStream();
+  {
+    auto const result = uncompressSome(
+        codec_.get(), input, chunkSize_, StreamCodec::FlushOp::END);
+    ASSERT_TRUE(result.first);
+    ASSERT_EQ(hashIOBuf(data.get()), hashIOBuf(result.second.get()));
+  }
+  // Uncompress the third frame
+  codec_->resetStream();
+  {
+    auto const result = uncompressSome(
+        codec_.get(), input, chunkSize_, StreamCodec::FlushOp::FLUSH);
+    ASSERT_TRUE(result.first);
+    ASSERT_EQ(hashIOBuf(data.get()), hashIOBuf(result.second.get()));
+  }
+  EXPECT_TRUE(input.empty());
+}
+
+TEST_P(StreamingCompressionTest, uncompressStream) {
+  runUncompressStreamTest(constantDataHolder);
+  runUncompressStreamTest(randomDataHolder);
+}
+
+void StreamingCompressionTest::runFlushTest(DataHolder const& dh) {
+  auto const inputs = split(dh.data(uncompressedLength_));
+  auto uncodec = getStreamCodec(codec_->type());
+
+  codec_->resetStream();
+  for (auto input : inputs) {
+    // Compress some data and flush the stream
+    auto compressed = compressSome(
+        codec_.get(), input, chunkSize_, StreamCodec::FlushOp::FLUSH);
+    auto compressedRange = compressed->coalesce();
+    // Uncompress the compressed data
+    auto result = uncompressSome(
+        uncodec.get(),
+        compressedRange,
+        chunkSize_,
+        StreamCodec::FlushOp::FLUSH);
+    // All compressed data should have been consumed
+    EXPECT_TRUE(compressedRange.empty());
+    // The frame isn't complete
+    EXPECT_FALSE(result.first);
+    // The uncompressed data should be exactly the input data
+    EXPECT_EQ(input.size(), result.second->computeChainDataLength());
+    auto const data = IOBuf::wrapBuffer(input);
+    EXPECT_EQ(hashIOBuf(data.get()), hashIOBuf(result.second.get()));
+  }
+}
+
+TEST_P(StreamingCompressionTest, testFlush) {
+  runFlushTest(constantDataHolder);
+  runFlushTest(randomDataHolder);
+}
+
+INSTANTIATE_TEST_CASE_P(
+    StreamingCompressionTest,
+    StreamingCompressionTest,
+    testing::Combine(
+        testing::Values(0, 1, 12, 22, 27),
+        testing::Values(12, 17, 20),
+        testing::ValuesIn(availableStreamCodecs())));
+
 class AutomaticCodecTest : public testing::TestWithParam<CodecType> {
  protected:
   void SetUp() override {
@@ -499,6 +930,10 @@ class CustomCodec : public Codec {
     return {prefix_};
   }
 
+  uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override {
+    return codec_->maxCompressedLength(uncompressedLength) + prefix_.size();
+  }
+
   bool canUncompress(const IOBuf* data, Optional<uint64_t>) const override {
     auto clone = data->cloneCoalescedAsValue();
     if (clone.length() < prefix_.size()) {