/**
* Zlib codec
*/
-class ZlibCodec final : public Codec {
+class ZlibStreamCodec final : public StreamCodec {
public:
- static std::unique_ptr<Codec> create(int level, CodecType type);
- explicit ZlibCodec(int level, CodecType type);
+ static std::unique_ptr<Codec> createCodec(int level, CodecType type);
+ static std::unique_ptr<StreamCodec> createStream(int level, CodecType type);
+ explicit ZlibStreamCodec(int level, CodecType type);
+ ~ZlibStreamCodec();
std::vector<std::string> validPrefixes() const override;
bool canUncompress(const IOBuf* data, Optional<uint64_t> uncompressedLength)
private:
uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override;
- std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
- std::unique_ptr<IOBuf> doUncompress(
- const IOBuf* data,
- Optional<uint64_t> uncompressedLength) override;
- std::unique_ptr<IOBuf> addOutputBuffer(z_stream* stream, uint32_t length);
- bool doInflate(z_stream* stream, IOBuf* head, uint32_t bufferLength);
+ void doResetStream() override;
+ bool doCompressStream(
+ ByteRange& input,
+ MutableByteRange& output,
+ StreamCodec::FlushOp flush) override;
+ bool doUncompressStream(
+ ByteRange& input,
+ MutableByteRange& output,
+ StreamCodec::FlushOp flush) override;
+ void resetDeflateStream();
+ void resetInflateStream();
+
+ Optional<z_stream> deflateStream_{};
+ Optional<z_stream> inflateStream_{};
int level_;
+ bool needReset_{true};
};
static constexpr uint16_t kGZIPMagicLE = 0x8B1F;
-std::vector<std::string> ZlibCodec::validPrefixes() const {
+std::vector<std::string> ZlibStreamCodec::validPrefixes() const {
if (type() == CodecType::ZLIB) {
// Zlib streams start with a 2 byte header.
//
}
}
-bool ZlibCodec::canUncompress(const IOBuf* data, Optional<uint64_t>) const {
+bool ZlibStreamCodec::canUncompress(const IOBuf* data, Optional<uint64_t>)
+ const {
if (type() == CodecType::ZLIB) {
uint16_t value;
Cursor cursor{data};
}
}
-uint64_t ZlibCodec::doMaxCompressedLength(uint64_t uncompressedLength) const {
+uint64_t ZlibStreamCodec::doMaxCompressedLength(
+ uint64_t uncompressedLength) const {
return deflateBound(nullptr, uncompressedLength);
}
-std::unique_ptr<Codec> ZlibCodec::create(int level, CodecType type) {
- return std::make_unique<ZlibCodec>(level, type);
+std::unique_ptr<Codec> ZlibStreamCodec::createCodec(int level, CodecType type) {
+ return std::make_unique<ZlibStreamCodec>(level, type);
}
-ZlibCodec::ZlibCodec(int level, CodecType type) : Codec(type) {
+std::unique_ptr<StreamCodec> ZlibStreamCodec::createStream(
+ int level,
+ CodecType type) {
+ return std::make_unique<ZlibStreamCodec>(level, type);
+}
+
+ZlibStreamCodec::ZlibStreamCodec(int level, CodecType type)
+ : StreamCodec(type) {
DCHECK(type == CodecType::ZLIB || type == CodecType::GZIP);
switch (level) {
- case COMPRESSION_LEVEL_FASTEST:
- level = 1;
- break;
- case COMPRESSION_LEVEL_DEFAULT:
- level = Z_DEFAULT_COMPRESSION;
- break;
- case COMPRESSION_LEVEL_BEST:
- level = 9;
- break;
+ case COMPRESSION_LEVEL_FASTEST:
+ level = 1;
+ break;
+ case COMPRESSION_LEVEL_DEFAULT:
+ level = Z_DEFAULT_COMPRESSION;
+ break;
+ case COMPRESSION_LEVEL_BEST:
+ level = 9;
+ break;
}
if (level != Z_DEFAULT_COMPRESSION && (level < 0 || level > 9)) {
- throw std::invalid_argument(to<std::string>(
- "ZlibCodec: invalid level: ", level));
+ throw std::invalid_argument(
+ to<std::string>("ZlibStreamCodec: invalid level: ", level));
}
level_ = level;
}
-std::unique_ptr<IOBuf> ZlibCodec::addOutputBuffer(z_stream* stream,
- uint32_t length) {
- CHECK_EQ(stream->avail_out, 0);
-
- auto buf = IOBuf::create(length);
- buf->append(buf->capacity());
-
- stream->next_out = buf->writableData();
- stream->avail_out = buf->length();
-
- return buf;
-}
-
-bool ZlibCodec::doInflate(z_stream* stream,
- IOBuf* head,
- uint32_t bufferLength) {
- if (stream->avail_out == 0) {
- head->prependChain(addOutputBuffer(stream, bufferLength));
+ZlibStreamCodec::~ZlibStreamCodec() {
+ if (deflateStream_) {
+ deflateEnd(deflateStream_.get_pointer());
+ deflateStream_.clear();
}
-
- int rc = inflate(stream, Z_NO_FLUSH);
-
- switch (rc) {
- case Z_OK:
- break;
- case Z_STREAM_END:
- return true;
- case Z_BUF_ERROR:
- case Z_NEED_DICT:
- case Z_DATA_ERROR:
- case Z_MEM_ERROR:
- throw std::runtime_error(to<std::string>(
- "ZlibCodec: inflate error: ", rc, ": ", stream->msg));
- default:
- CHECK(false) << rc << ": " << stream->msg;
+ if (inflateStream_) {
+ inflateEnd(inflateStream_.get_pointer());
+ inflateStream_.clear();
}
-
- return false;
}
-std::unique_ptr<IOBuf> ZlibCodec::doCompress(const IOBuf* data) {
- z_stream stream;
- stream.zalloc = nullptr;
- stream.zfree = nullptr;
- stream.opaque = nullptr;
+void ZlibStreamCodec::doResetStream() {
+ needReset_ = true;
+}
+void ZlibStreamCodec::resetDeflateStream() {
+ if (deflateStream_) {
+ int const rc = deflateReset(deflateStream_.get_pointer());
+ if (rc != Z_OK) {
+ deflateStream_.clear();
+ throw std::runtime_error(
+ to<std::string>("ZlibStreamCodec: deflateReset error: ", rc));
+ }
+ return;
+ }
+ deflateStream_ = z_stream{};
// Using deflateInit2() to support gzip. "The windowBits parameter is the
// base two logarithm of the maximum window size (...) The default value is
// 15 (...) Add 16 to windowBits to write a simple gzip header and trailer
// will have no file name, no extra data, no comment, no modification time
// (set to zero), no header crc, and the operating system will be set to 255
// (unknown)."
- int windowBits = 15 + (type() == CodecType::GZIP ? 16 : 0);
+ int const windowBits = 15 + (type() == CodecType::GZIP ? 16 : 0);
// All other parameters (method, memLevel, strategy) get default values from
// the zlib manual.
- int rc = deflateInit2(&stream,
- level_,
- Z_DEFLATED,
- windowBits,
- /* memLevel */ 8,
- Z_DEFAULT_STRATEGY);
+ int const rc = deflateInit2(
+ deflateStream_.get_pointer(),
+ level_,
+ Z_DEFLATED,
+ windowBits,
+ /* memLevel */ 8,
+ Z_DEFAULT_STRATEGY);
if (rc != Z_OK) {
- throw std::runtime_error(to<std::string>(
- "ZlibCodec: deflateInit error: ", rc, ": ", stream.msg));
+ deflateStream_.clear();
+ throw std::runtime_error(
+ to<std::string>("ZlibStreamCodec: deflateInit error: ", rc));
}
+}
- stream.next_in = stream.next_out = nullptr;
- stream.avail_in = stream.avail_out = 0;
- stream.total_in = stream.total_out = 0;
-
- bool success = false;
-
- SCOPE_EXIT {
- rc = deflateEnd(&stream);
- // If we're here because of an exception, it's okay if some data
- // got dropped.
- CHECK(rc == Z_OK || (!success && rc == Z_DATA_ERROR))
- << rc << ": " << stream.msg;
- };
-
- uint64_t uncompressedLength = data->computeChainDataLength();
- uint64_t maxCompressedLength = deflateBound(&stream, uncompressedLength);
-
- // Max 64MiB in one go
- constexpr uint32_t maxSingleStepLength = uint32_t(64) << 20; // 64MiB
- constexpr uint32_t defaultBufferLength = uint32_t(4) << 20; // 4MiB
-
- auto out = addOutputBuffer(
- &stream,
- (maxCompressedLength <= maxSingleStepLength ?
- maxCompressedLength :
- defaultBufferLength));
-
- for (auto& range : *data) {
- uint64_t remaining = range.size();
- uint64_t written = 0;
- while (remaining) {
- uint32_t step = (remaining > maxSingleStepLength ?
- maxSingleStepLength : remaining);
- stream.next_in = const_cast<uint8_t*>(range.data() + written);
- stream.avail_in = step;
- remaining -= step;
- written += step;
-
- while (stream.avail_in != 0) {
- if (stream.avail_out == 0) {
- out->prependChain(addOutputBuffer(&stream, defaultBufferLength));
- }
-
- rc = deflate(&stream, Z_NO_FLUSH);
-
- CHECK_EQ(rc, Z_OK) << stream.msg;
- }
+void ZlibStreamCodec::resetInflateStream() {
+ if (inflateStream_) {
+ int const rc = inflateReset(inflateStream_.get_pointer());
+ if (rc != Z_OK) {
+ inflateStream_.clear();
+ throw std::runtime_error(
+ to<std::string>("ZlibStreamCodec: inflateReset error: ", rc));
}
+ return;
}
-
- do {
- if (stream.avail_out == 0) {
- out->prependChain(addOutputBuffer(&stream, defaultBufferLength));
- }
-
- rc = deflate(&stream, Z_FINISH);
- } while (rc == Z_OK);
-
- CHECK_EQ(rc, Z_STREAM_END) << stream.msg;
-
- out->prev()->trimEnd(stream.avail_out);
-
- success = true; // we survived
-
- return out;
-}
-
-std::unique_ptr<IOBuf> ZlibCodec::doUncompress(
- const IOBuf* data,
- Optional<uint64_t> uncompressedLength) {
- z_stream stream;
- stream.zalloc = nullptr;
- stream.zfree = nullptr;
- stream.opaque = nullptr;
-
+ inflateStream_ = z_stream{};
// "The windowBits parameter is the base two logarithm of the maximum window
// size (...) The default value is 15 (...) add 16 to decode only the gzip
// format (the zlib format will return a Z_DATA_ERROR)."
- int windowBits = 15 + (type() == CodecType::GZIP ? 16 : 0);
- int rc = inflateInit2(&stream, windowBits);
+ int const windowBits = 15 + (type() == CodecType::GZIP ? 16 : 0);
+ int const rc = inflateInit2(inflateStream_.get_pointer(), windowBits);
if (rc != Z_OK) {
- throw std::runtime_error(to<std::string>(
- "ZlibCodec: inflateInit error: ", rc, ": ", stream.msg));
+ inflateStream_.clear();
+ throw std::runtime_error(
+ to<std::string>("ZlibStreamCodec: inflateInit error: ", rc));
}
+}
- stream.next_in = stream.next_out = nullptr;
- stream.avail_in = stream.avail_out = 0;
- stream.total_in = stream.total_out = 0;
+static int zlibTranslateFlush(StreamCodec::FlushOp flush) {
+ switch (flush) {
+ case StreamCodec::FlushOp::NONE:
+ return Z_NO_FLUSH;
+ case StreamCodec::FlushOp::FLUSH:
+ return Z_SYNC_FLUSH;
+ case StreamCodec::FlushOp::END:
+ return Z_FINISH;
+ default:
+ throw std::invalid_argument("ZlibStreamCodec: Invalid flush");
+ }
+}
- bool success = false;
+static int zlibThrowOnError(int rc) {
+ switch (rc) {
+ case Z_OK:
+ case Z_BUF_ERROR:
+ case Z_STREAM_END:
+ return rc;
+ default:
+ throw std::runtime_error(to<std::string>("ZlibStreamCodec: error: ", rc));
+ }
+}
+bool ZlibStreamCodec::doCompressStream(
+ ByteRange& input,
+ MutableByteRange& output,
+ StreamCodec::FlushOp flush) {
+ if (needReset_) {
+ resetDeflateStream();
+ needReset_ = false;
+ }
+ DCHECK(deflateStream_.hasValue());
+ // zlib will return Z_STREAM_ERROR if output.data() is null.
+ if (output.data() == nullptr) {
+ return false;
+ }
+ deflateStream_->next_in = const_cast<uint8_t*>(input.data());
+ deflateStream_->avail_in = input.size();
+ deflateStream_->next_out = output.data();
+ deflateStream_->avail_out = output.size();
SCOPE_EXIT {
- rc = inflateEnd(&stream);
- // If we're here because of an exception, it's okay if some data
- // got dropped.
- CHECK(rc == Z_OK || (!success && rc == Z_DATA_ERROR))
- << rc << ": " << stream.msg;
+ input.uncheckedAdvance(input.size() - deflateStream_->avail_in);
+ output.uncheckedAdvance(output.size() - deflateStream_->avail_out);
};
-
- // Max 64MiB in one go
- constexpr uint64_t maxSingleStepLength = uint64_t(64) << 20; // 64MiB
- constexpr uint64_t kBlockSize = uint64_t(32) << 10; // 32 KiB
- const uint64_t defaultBufferLength =
- computeBufferLength(data->computeChainDataLength(), kBlockSize);
-
- auto out = addOutputBuffer(
- &stream,
- ((uncompressedLength && *uncompressedLength <= maxSingleStepLength)
- ? *uncompressedLength
- : defaultBufferLength));
-
- bool streamEnd = false;
- for (auto& range : *data) {
- if (range.empty()) {
- continue;
- }
-
- stream.next_in = const_cast<uint8_t*>(range.data());
- stream.avail_in = range.size();
-
- while (stream.avail_in != 0) {
- if (streamEnd) {
- throw std::runtime_error(to<std::string>(
- "ZlibCodec: junk after end of data"));
- }
-
- streamEnd = doInflate(&stream, out.get(), defaultBufferLength);
- }
+ int const rc = zlibThrowOnError(
+ deflate(deflateStream_.get_pointer(), zlibTranslateFlush(flush)));
+ switch (flush) {
+ case StreamCodec::FlushOp::NONE:
+ return false;
+ case StreamCodec::FlushOp::FLUSH:
+ return deflateStream_->avail_in == 0 && deflateStream_->avail_out != 0;
+ case StreamCodec::FlushOp::END:
+ return rc == Z_STREAM_END;
+ default:
+ throw std::invalid_argument("ZlibStreamCodec: Invalid flush");
}
+}
- while (!streamEnd) {
- streamEnd = doInflate(&stream, out.get(), defaultBufferLength);
+bool ZlibStreamCodec::doUncompressStream(
+ ByteRange& input,
+ MutableByteRange& output,
+ StreamCodec::FlushOp flush) {
+ if (needReset_) {
+ resetInflateStream();
+ needReset_ = false;
}
-
- out->prev()->trimEnd(stream.avail_out);
-
- if (uncompressedLength && *uncompressedLength != stream.total_out) {
- throw std::runtime_error(
- to<std::string>("ZlibCodec: invalid uncompressed length"));
+ DCHECK(inflateStream_.hasValue());
+ // zlib will return Z_STREAM_ERROR if output.data() is null.
+ if (output.data() == nullptr) {
+ return false;
}
-
- success = true; // we survived
-
- return out;
+ inflateStream_->next_in = const_cast<uint8_t*>(input.data());
+ inflateStream_->avail_in = input.size();
+ inflateStream_->next_out = output.data();
+ inflateStream_->avail_out = output.size();
+ SCOPE_EXIT {
+ input.advance(input.size() - inflateStream_->avail_in);
+ output.advance(output.size() - inflateStream_->avail_out);
+ };
+ int const rc = zlibThrowOnError(
+ inflate(inflateStream_.get_pointer(), zlibTranslateFlush(flush)));
+ return rc == Z_STREAM_END;
}
-#endif // FOLLY_HAVE_LIBZ
+#endif // FOLLY_HAVE_LIBZ
#if FOLLY_HAVE_LIBLZMA
#endif
#if FOLLY_HAVE_LIBZ
- {ZlibCodec::create, nullptr},
+ {ZlibStreamCodec::createCodec, ZlibStreamCodec::createStream},
#else
{},
#endif
#endif
#if FOLLY_HAVE_LIBZ
- {ZlibCodec::create, nullptr},
+ {ZlibStreamCodec::createCodec, ZlibStreamCodec::createStream},
#else
{},
#endif