2 * Copyright 2013 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #include "folly/io/Compression.h"
21 #include <glog/logging.h>
23 #include <snappy-sinksource.h>
26 #include "folly/Conv.h"
27 #include "folly/Memory.h"
28 #include "folly/Portability.h"
29 #include "folly/ScopeGuard.h"
30 #include "folly/io/Cursor.h"
32 namespace folly { namespace io {
34 // Ensure consistent behavior in the nullptr case
35 std::unique_ptr<IOBuf> Codec::compress(const IOBuf* data) {
36 return !data->empty() ? doCompress(data) : IOBuf::create(0);
39 std::unique_ptr<IOBuf> Codec::uncompress(const IOBuf* data,
40 uint64_t uncompressedLength) {
41 if (uncompressedLength == UNKNOWN_UNCOMPRESSED_LENGTH) {
42 if (needsUncompressedLength()) {
43 throw std::invalid_argument("Codec: uncompressed length required");
45 } else if (uncompressedLength > maxUncompressedLength()) {
46 throw std::runtime_error("Codec: uncompressed length too large");
50 if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
51 uncompressedLength != 0) {
52 throw std::runtime_error("Codec: invalid uncompressed length");
54 return IOBuf::create(0);
57 return doUncompress(data, uncompressedLength);
60 bool Codec::needsUncompressedLength() const {
61 return doNeedsUncompressedLength();
64 uint64_t Codec::maxUncompressedLength() const {
65 return doMaxUncompressedLength();
68 CodecType Codec::type() const {
72 bool Codec::doNeedsUncompressedLength() const {
76 uint64_t Codec::doMaxUncompressedLength() const {
77 return std::numeric_limits<uint64_t>::max() - 1;
85 class NoCompressionCodec FOLLY_FINAL : public Codec {
87 static std::unique_ptr<Codec> create(int level);
88 explicit NoCompressionCodec(int level);
91 CodecType doType() const FOLLY_OVERRIDE;
92 std::unique_ptr<IOBuf> doCompress(const IOBuf* data) FOLLY_OVERRIDE;
93 std::unique_ptr<IOBuf> doUncompress(
95 uint64_t uncompressedLength) FOLLY_OVERRIDE;
98 std::unique_ptr<Codec> NoCompressionCodec::create(int level) {
99 return make_unique<NoCompressionCodec>(level);
102 NoCompressionCodec::NoCompressionCodec(int level) {
104 case COMPRESSION_LEVEL_DEFAULT:
105 case COMPRESSION_LEVEL_FASTEST:
106 case COMPRESSION_LEVEL_BEST:
110 throw std::invalid_argument(to<std::string>(
111 "NoCompressionCodec: invalid level ", level));
115 CodecType NoCompressionCodec::doType() const {
116 return CodecType::NO_COMPRESSION;
119 std::unique_ptr<IOBuf> NoCompressionCodec::doCompress(
121 return data->clone();
124 std::unique_ptr<IOBuf> NoCompressionCodec::doUncompress(
126 uint64_t uncompressedLength) {
127 if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
128 data->computeChainDataLength() != uncompressedLength) {
129 throw std::runtime_error(to<std::string>(
130 "NoCompressionCodec: invalid uncompressed length"));
132 return data->clone();
138 class LZ4Codec FOLLY_FINAL : public Codec {
140 static std::unique_ptr<Codec> create(int level);
141 explicit LZ4Codec(int level);
144 bool doNeedsUncompressedLength() const FOLLY_OVERRIDE;
145 uint64_t doMaxUncompressedLength() const FOLLY_OVERRIDE;
146 CodecType doType() const FOLLY_OVERRIDE;
147 std::unique_ptr<IOBuf> doCompress(const IOBuf* data) FOLLY_OVERRIDE;
148 std::unique_ptr<IOBuf> doUncompress(
150 uint64_t uncompressedLength) FOLLY_OVERRIDE;
152 bool highCompression_;
155 std::unique_ptr<Codec> LZ4Codec::create(int level) {
156 return make_unique<LZ4Codec>(level);
159 LZ4Codec::LZ4Codec(int level) {
161 case COMPRESSION_LEVEL_FASTEST:
162 case COMPRESSION_LEVEL_DEFAULT:
165 case COMPRESSION_LEVEL_BEST:
169 if (level < 1 || level > 2) {
170 throw std::invalid_argument(to<std::string>(
171 "LZ4Codec: invalid level: ", level));
173 highCompression_ = (level > 1);
176 bool LZ4Codec::doNeedsUncompressedLength() const {
180 uint64_t LZ4Codec::doMaxUncompressedLength() const {
181 // From lz4.h: "Max supported value is ~1.9GB"; I wish we had something
183 return 1.8 * (uint64_t(1) << 30);
186 CodecType LZ4Codec::doType() const {
187 return CodecType::LZ4;
190 std::unique_ptr<IOBuf> LZ4Codec::doCompress(const IOBuf* data) {
191 std::unique_ptr<IOBuf> clone;
192 if (data->isChained()) {
193 // LZ4 doesn't support streaming, so we have to coalesce
194 clone = data->clone();
199 auto out = IOBuf::create(LZ4_compressBound(data->length()));
201 if (highCompression_) {
202 n = LZ4_compress(reinterpret_cast<const char*>(data->data()),
203 reinterpret_cast<char*>(out->writableTail()),
206 n = LZ4_compressHC(reinterpret_cast<const char*>(data->data()),
207 reinterpret_cast<char*>(out->writableTail()),
212 CHECK_LE(n, out->capacity());
218 std::unique_ptr<IOBuf> LZ4Codec::doUncompress(
220 uint64_t uncompressedLength) {
221 std::unique_ptr<IOBuf> clone;
222 if (data->isChained()) {
223 // LZ4 doesn't support streaming, so we have to coalesce
224 clone = data->clone();
229 auto out = IOBuf::create(uncompressedLength);
230 int n = LZ4_uncompress(reinterpret_cast<const char*>(data->data()),
231 reinterpret_cast<char*>(out->writableTail()),
233 if (n != data->length()) {
234 throw std::runtime_error(to<std::string>(
235 "LZ4 decompression returned invalid value ", n));
237 out->append(uncompressedLength);
246 * Implementation of snappy::Source that reads from a IOBuf chain.
248 class IOBufSnappySource FOLLY_FINAL : public snappy::Source {
250 explicit IOBufSnappySource(const IOBuf* data);
251 size_t Available() const FOLLY_OVERRIDE;
252 const char* Peek(size_t* len) FOLLY_OVERRIDE;
253 void Skip(size_t n) FOLLY_OVERRIDE;
259 IOBufSnappySource::IOBufSnappySource(const IOBuf* data)
260 : available_(data->computeChainDataLength()),
264 size_t IOBufSnappySource::Available() const {
268 const char* IOBufSnappySource::Peek(size_t* len) {
269 auto p = cursor_.peek();
271 return reinterpret_cast<const char*>(p.first);
274 void IOBufSnappySource::Skip(size_t n) {
275 CHECK_LE(n, available_);
280 class SnappyCodec FOLLY_FINAL : public Codec {
282 static std::unique_ptr<Codec> create(int level);
283 explicit SnappyCodec(int level);
286 uint64_t doMaxUncompressedLength() const FOLLY_OVERRIDE;
287 CodecType doType() const FOLLY_OVERRIDE;
288 std::unique_ptr<IOBuf> doCompress(const IOBuf* data) FOLLY_OVERRIDE;
289 std::unique_ptr<IOBuf> doUncompress(
291 uint64_t uncompressedLength) FOLLY_OVERRIDE;
294 std::unique_ptr<Codec> SnappyCodec::create(int level) {
295 return make_unique<SnappyCodec>(level);
298 SnappyCodec::SnappyCodec(int level) {
300 case COMPRESSION_LEVEL_FASTEST:
301 case COMPRESSION_LEVEL_DEFAULT:
302 case COMPRESSION_LEVEL_BEST:
306 throw std::invalid_argument(to<std::string>(
307 "SnappyCodec: invalid level: ", level));
311 uint64_t SnappyCodec::doMaxUncompressedLength() const {
312 // snappy.h uses uint32_t for lengths, so there's that.
313 return std::numeric_limits<uint32_t>::max();
316 CodecType SnappyCodec::doType() const {
317 return CodecType::SNAPPY;
320 std::unique_ptr<IOBuf> SnappyCodec::doCompress(const IOBuf* data) {
321 IOBufSnappySource source(data);
323 IOBuf::create(snappy::MaxCompressedLength(source.Available()));
325 snappy::UncheckedByteArraySink sink(reinterpret_cast<char*>(
326 out->writableTail()));
328 size_t n = snappy::Compress(&source, &sink);
330 CHECK_LE(n, out->capacity());
335 std::unique_ptr<IOBuf> SnappyCodec::doUncompress(const IOBuf* data,
336 uint64_t uncompressedLength) {
337 uint32_t actualUncompressedLength = 0;
340 IOBufSnappySource source(data);
341 if (!snappy::GetUncompressedLength(&source, &actualUncompressedLength)) {
342 throw std::runtime_error("snappy::GetUncompressedLength failed");
344 if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
345 uncompressedLength != actualUncompressedLength) {
346 throw std::runtime_error("snappy: invalid uncompressed length");
350 auto out = IOBuf::create(actualUncompressedLength);
353 IOBufSnappySource source(data);
354 if (!snappy::RawUncompress(&source,
355 reinterpret_cast<char*>(out->writableTail()))) {
356 throw std::runtime_error("snappy::RawUncompress failed");
360 out->append(actualUncompressedLength);
367 class ZlibCodec FOLLY_FINAL : public Codec {
369 static std::unique_ptr<Codec> create(int level);
370 explicit ZlibCodec(int level);
373 CodecType doType() const FOLLY_OVERRIDE;
374 std::unique_ptr<IOBuf> doCompress(const IOBuf* data) FOLLY_OVERRIDE;
375 std::unique_ptr<IOBuf> doUncompress(
377 uint64_t uncompressedLength) FOLLY_OVERRIDE;
379 std::unique_ptr<IOBuf> addOutputBuffer(z_stream* stream, uint32_t length);
380 bool doInflate(z_stream* stream, IOBuf* head, uint32_t bufferLength);
385 std::unique_ptr<Codec> ZlibCodec::create(int level) {
386 return make_unique<ZlibCodec>(level);
389 ZlibCodec::ZlibCodec(int level) {
391 case COMPRESSION_LEVEL_FASTEST:
394 case COMPRESSION_LEVEL_DEFAULT:
395 level = Z_DEFAULT_COMPRESSION;
397 case COMPRESSION_LEVEL_BEST:
401 if (level != Z_DEFAULT_COMPRESSION && (level < 0 || level > 9)) {
402 throw std::invalid_argument(to<std::string>(
403 "ZlibCodec: invalid level: ", level));
408 CodecType ZlibCodec::doType() const {
409 return CodecType::ZLIB;
412 std::unique_ptr<IOBuf> ZlibCodec::addOutputBuffer(z_stream* stream,
414 CHECK_EQ(stream->avail_out, 0);
416 auto buf = IOBuf::create(length);
419 stream->next_out = buf->writableData();
420 stream->avail_out = buf->length();
425 bool ZlibCodec::doInflate(z_stream* stream,
427 uint32_t bufferLength) {
428 if (stream->avail_out == 0) {
429 head->prependChain(addOutputBuffer(stream, bufferLength));
432 int rc = inflate(stream, Z_NO_FLUSH);
443 throw std::runtime_error(to<std::string>(
444 "ZlibCodec: inflate error: ", rc, ": ", stream->msg));
446 CHECK(false) << rc << ": " << stream->msg;
453 std::unique_ptr<IOBuf> ZlibCodec::doCompress(const IOBuf* data) {
455 stream.zalloc = nullptr;
456 stream.zfree = nullptr;
457 stream.opaque = nullptr;
459 int rc = deflateInit(&stream, level_);
461 throw std::runtime_error(to<std::string>(
462 "ZlibCodec: deflateInit error: ", rc, ": ", stream.msg));
465 stream.next_in = stream.next_out = nullptr;
466 stream.avail_in = stream.avail_out = 0;
467 stream.total_in = stream.total_out = 0;
469 bool success = false;
472 int rc = deflateEnd(&stream);
473 // If we're here because of an exception, it's okay if some data
475 CHECK(rc == Z_OK || (!success && rc == Z_DATA_ERROR))
476 << rc << ": " << stream.msg;
479 uint64_t uncompressedLength = data->computeChainDataLength();
480 uint64_t maxCompressedLength = deflateBound(&stream, uncompressedLength);
482 // Max 64MiB in one go
483 constexpr uint32_t maxSingleStepLength = uint32_t(64) << 20; // 64MiB
484 constexpr uint32_t defaultBufferLength = uint32_t(4) << 20; // 4MiB
486 auto out = addOutputBuffer(
488 (maxCompressedLength <= maxSingleStepLength ?
489 maxCompressedLength :
490 defaultBufferLength));
492 for (auto& range : *data) {
497 stream.next_in = const_cast<uint8_t*>(range.data());
498 stream.avail_in = range.size();
500 while (stream.avail_in != 0) {
501 if (stream.avail_out == 0) {
502 out->prependChain(addOutputBuffer(&stream, defaultBufferLength));
505 rc = deflate(&stream, Z_NO_FLUSH);
507 CHECK_EQ(rc, Z_OK) << stream.msg;
512 if (stream.avail_out == 0) {
513 out->prependChain(addOutputBuffer(&stream, defaultBufferLength));
516 rc = deflate(&stream, Z_FINISH);
517 } while (rc == Z_OK);
519 CHECK_EQ(rc, Z_STREAM_END) << stream.msg;
521 out->prev()->trimEnd(stream.avail_out);
523 success = true; // we survived
528 std::unique_ptr<IOBuf> ZlibCodec::doUncompress(const IOBuf* data,
529 uint64_t uncompressedLength) {
531 stream.zalloc = nullptr;
532 stream.zfree = nullptr;
533 stream.opaque = nullptr;
535 int rc = inflateInit(&stream);
537 throw std::runtime_error(to<std::string>(
538 "ZlibCodec: inflateInit error: ", rc, ": ", stream.msg));
541 stream.next_in = stream.next_out = nullptr;
542 stream.avail_in = stream.avail_out = 0;
543 stream.total_in = stream.total_out = 0;
545 bool success = false;
548 int rc = inflateEnd(&stream);
549 // If we're here because of an exception, it's okay if some data
551 CHECK(rc == Z_OK || (!success && rc == Z_DATA_ERROR))
552 << rc << ": " << stream.msg;
555 // Max 64MiB in one go
556 constexpr uint32_t maxSingleStepLength = uint32_t(64) << 20; // 64MiB
557 constexpr uint32_t defaultBufferLength = uint32_t(4) << 20; // 4MiB
559 auto out = addOutputBuffer(
561 ((uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
562 uncompressedLength <= maxSingleStepLength) ?
564 defaultBufferLength));
566 bool streamEnd = false;
567 for (auto& range : *data) {
572 stream.next_in = const_cast<uint8_t*>(range.data());
573 stream.avail_in = range.size();
575 while (stream.avail_in != 0) {
577 throw std::runtime_error(to<std::string>(
578 "ZlibCodec: junk after end of data"));
581 streamEnd = doInflate(&stream, out.get(), defaultBufferLength);
586 streamEnd = doInflate(&stream, out.get(), defaultBufferLength);
589 out->prev()->trimEnd(stream.avail_out);
591 if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
592 uncompressedLength != stream.total_out) {
593 throw std::runtime_error(to<std::string>(
594 "ZlibCodec: invalid uncompressed length"));
597 success = true; // we survived
602 typedef std::unique_ptr<Codec> (*CodecFactory)(int);
604 CodecFactory gCodecFactories[
605 static_cast<size_t>(CodecType::NUM_CODEC_TYPES)] = {
606 NoCompressionCodec::create,
614 std::unique_ptr<Codec> getCodec(CodecType type, int level) {
615 size_t idx = static_cast<size_t>(type);
616 if (idx >= static_cast<size_t>(CodecType::NUM_CODEC_TYPES)) {
617 throw std::invalid_argument(to<std::string>(
618 "Compression type ", idx, " not supported"));
620 auto factory = gCodecFactories[idx];
622 throw std::invalid_argument(to<std::string>(
623 "Compression type ", idx, " not supported"));
625 auto codec = (*factory)(level);
626 DCHECK_EQ(static_cast<size_t>(codec->type()), idx);