--- /dev/null
+/*
+ * Copyright 2013 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "folly/io/Compression.h"
+
+#include <lz4.h>
+#include <lz4hc.h>
+#include <glog/logging.h>
+#include <snappy.h>
+#include <snappy-sinksource.h>
+#include <zlib.h>
+
+#include "folly/Conv.h"
+#include "folly/Memory.h"
+#include "folly/Portability.h"
+#include "folly/ScopeGuard.h"
+#include "folly/io/Cursor.h"
+
+namespace folly { namespace io {
+
+// Ensure consistent behavior in the nullptr case
+std::unique_ptr<IOBuf> Codec::compress(const IOBuf* data) {
+ return !data->empty() ? doCompress(data) : IOBuf::create(0);
+}
+
+std::unique_ptr<IOBuf> Codec::uncompress(const IOBuf* data,
+ uint64_t uncompressedLength) {
+ if (uncompressedLength == UNKNOWN_UNCOMPRESSED_LENGTH) {
+ if (needsUncompressedLength()) {
+ throw std::invalid_argument("Codec: uncompressed length required");
+ }
+ } else if (uncompressedLength > maxUncompressedLength()) {
+ throw std::runtime_error("Codec: uncompressed length too large");
+ }
+
+ if (data->empty()) {
+ if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
+ uncompressedLength != 0) {
+ throw std::runtime_error("Codec: invalid uncompressed length");
+ }
+ return IOBuf::create(0);
+ }
+
+ return doUncompress(data, uncompressedLength);
+}
+
+bool Codec::needsUncompressedLength() const {
+ return doNeedsUncompressedLength();
+}
+
+uint64_t Codec::maxUncompressedLength() const {
+ return doMaxUncompressedLength();
+}
+
+CodecType Codec::type() const {
+ return doType();
+}
+
+bool Codec::doNeedsUncompressedLength() const {
+ return false;
+}
+
+uint64_t Codec::doMaxUncompressedLength() const {
+ return std::numeric_limits<uint64_t>::max() - 1;
+}
+
+namespace {
+
+/**
+ * No compression
+ */
+class NoCompressionCodec FOLLY_FINAL : public Codec {
+ public:
+ static std::unique_ptr<Codec> create(int level);
+ explicit NoCompressionCodec(int level);
+
+ private:
+ CodecType doType() const FOLLY_OVERRIDE;
+ std::unique_ptr<IOBuf> doCompress(const IOBuf* data) FOLLY_OVERRIDE;
+ std::unique_ptr<IOBuf> doUncompress(
+ const IOBuf* data,
+ uint64_t uncompressedLength) FOLLY_OVERRIDE;
+};
+
+std::unique_ptr<Codec> NoCompressionCodec::create(int level) {
+ return make_unique<NoCompressionCodec>(level);
+}
+
+NoCompressionCodec::NoCompressionCodec(int level) {
+ switch (level) {
+ case COMPRESSION_LEVEL_DEFAULT:
+ case COMPRESSION_LEVEL_FASTEST:
+ case COMPRESSION_LEVEL_BEST:
+ level = 0;
+ }
+ if (level != 0) {
+ throw std::invalid_argument(to<std::string>(
+ "NoCompressionCodec: invalid level ", level));
+ }
+}
+
+CodecType NoCompressionCodec::doType() const {
+ return CodecType::NO_COMPRESSION;
+}
+
+std::unique_ptr<IOBuf> NoCompressionCodec::doCompress(
+ const IOBuf* data) {
+ return data->clone();
+}
+
+std::unique_ptr<IOBuf> NoCompressionCodec::doUncompress(
+ const IOBuf* data,
+ uint64_t uncompressedLength) {
+ if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
+ data->computeChainDataLength() != uncompressedLength) {
+ throw std::runtime_error(to<std::string>(
+ "NoCompressionCodec: invalid uncompressed length"));
+ }
+ return data->clone();
+}
+
+/**
+ * LZ4 compression
+ */
+class LZ4Codec FOLLY_FINAL : public Codec {
+ public:
+ static std::unique_ptr<Codec> create(int level);
+ explicit LZ4Codec(int level);
+
+ private:
+ bool doNeedsUncompressedLength() const FOLLY_OVERRIDE;
+ uint64_t doMaxUncompressedLength() const FOLLY_OVERRIDE;
+ CodecType doType() const FOLLY_OVERRIDE;
+ std::unique_ptr<IOBuf> doCompress(const IOBuf* data) FOLLY_OVERRIDE;
+ std::unique_ptr<IOBuf> doUncompress(
+ const IOBuf* data,
+ uint64_t uncompressedLength) FOLLY_OVERRIDE;
+
+ bool highCompression_;
+};
+
+std::unique_ptr<Codec> LZ4Codec::create(int level) {
+ return make_unique<LZ4Codec>(level);
+}
+
+LZ4Codec::LZ4Codec(int level) {
+ switch (level) {
+ case COMPRESSION_LEVEL_FASTEST:
+ case COMPRESSION_LEVEL_DEFAULT:
+ level = 1;
+ break;
+ case COMPRESSION_LEVEL_BEST:
+ level = 2;
+ break;
+ }
+ if (level < 1 || level > 2) {
+ throw std::invalid_argument(to<std::string>(
+ "LZ4Codec: invalid level: ", level));
+ }
+ highCompression_ = (level > 1);
+}
+
+bool LZ4Codec::doNeedsUncompressedLength() const {
+ return true;
+}
+
+uint64_t LZ4Codec::doMaxUncompressedLength() const {
+ // From lz4.h: "Max supported value is ~1.9GB"; I wish we had something
+ // more accurate.
+ return 1.8 * (uint64_t(1) << 30);
+}
+
+CodecType LZ4Codec::doType() const {
+ return CodecType::LZ4;
+}
+
+std::unique_ptr<IOBuf> LZ4Codec::doCompress(const IOBuf* data) {
+ std::unique_ptr<IOBuf> clone;
+ if (data->isChained()) {
+ // LZ4 doesn't support streaming, so we have to coalesce
+ clone = data->clone();
+ clone->coalesce();
+ data = clone.get();
+ }
+
+ auto out = IOBuf::create(LZ4_compressBound(data->length()));
+ int n;
+ if (highCompression_) {
+ n = LZ4_compress(reinterpret_cast<const char*>(data->data()),
+ reinterpret_cast<char*>(out->writableTail()),
+ data->length());
+ } else {
+ n = LZ4_compressHC(reinterpret_cast<const char*>(data->data()),
+ reinterpret_cast<char*>(out->writableTail()),
+ data->length());
+ }
+
+ CHECK_GE(n, 0);
+ CHECK_LE(n, out->capacity());
+
+ out->append(n);
+ return out;
+}
+
+std::unique_ptr<IOBuf> LZ4Codec::doUncompress(
+ const IOBuf* data,
+ uint64_t uncompressedLength) {
+ std::unique_ptr<IOBuf> clone;
+ if (data->isChained()) {
+ // LZ4 doesn't support streaming, so we have to coalesce
+ clone = data->clone();
+ clone->coalesce();
+ data = clone.get();
+ }
+
+ auto out = IOBuf::create(uncompressedLength);
+ int n = LZ4_uncompress(reinterpret_cast<const char*>(data->data()),
+ reinterpret_cast<char*>(out->writableTail()),
+ uncompressedLength);
+ if (n != data->length()) {
+ throw std::runtime_error(to<std::string>(
+ "LZ4 decompression returned invalid value ", n));
+ }
+ out->append(uncompressedLength);
+ return out;
+}
+
+/**
+ * Snappy compression
+ */
+
+/**
+ * Implementation of snappy::Source that reads from a IOBuf chain.
+ */
+class IOBufSnappySource FOLLY_FINAL : public snappy::Source {
+ public:
+ explicit IOBufSnappySource(const IOBuf* data);
+ size_t Available() const FOLLY_OVERRIDE;
+ const char* Peek(size_t* len) FOLLY_OVERRIDE;
+ void Skip(size_t n) FOLLY_OVERRIDE;
+ private:
+ size_t available_;
+ io::Cursor cursor_;
+};
+
+IOBufSnappySource::IOBufSnappySource(const IOBuf* data)
+ : available_(data->computeChainDataLength()),
+ cursor_(data) {
+}
+
+size_t IOBufSnappySource::Available() const {
+ return available_;
+}
+
+const char* IOBufSnappySource::Peek(size_t* len) {
+ auto p = cursor_.peek();
+ *len = p.second;
+ return reinterpret_cast<const char*>(p.first);
+}
+
+void IOBufSnappySource::Skip(size_t n) {
+ CHECK_LE(n, available_);
+ cursor_.skip(n);
+ available_ -= n;
+}
+
+class SnappyCodec FOLLY_FINAL : public Codec {
+ public:
+ static std::unique_ptr<Codec> create(int level);
+ explicit SnappyCodec(int level);
+
+ private:
+ uint64_t doMaxUncompressedLength() const FOLLY_OVERRIDE;
+ CodecType doType() const FOLLY_OVERRIDE;
+ std::unique_ptr<IOBuf> doCompress(const IOBuf* data) FOLLY_OVERRIDE;
+ std::unique_ptr<IOBuf> doUncompress(
+ const IOBuf* data,
+ uint64_t uncompressedLength) FOLLY_OVERRIDE;
+};
+
+std::unique_ptr<Codec> SnappyCodec::create(int level) {
+ return make_unique<SnappyCodec>(level);
+}
+
+SnappyCodec::SnappyCodec(int level) {
+ switch (level) {
+ case COMPRESSION_LEVEL_FASTEST:
+ case COMPRESSION_LEVEL_DEFAULT:
+ case COMPRESSION_LEVEL_BEST:
+ level = 1;
+ }
+ if (level != 1) {
+ throw std::invalid_argument(to<std::string>(
+ "SnappyCodec: invalid level: ", level));
+ }
+}
+
+uint64_t SnappyCodec::doMaxUncompressedLength() const {
+ // snappy.h uses uint32_t for lengths, so there's that.
+ return std::numeric_limits<uint32_t>::max();
+}
+
+CodecType SnappyCodec::doType() const {
+ return CodecType::SNAPPY;
+}
+
+std::unique_ptr<IOBuf> SnappyCodec::doCompress(const IOBuf* data) {
+ IOBufSnappySource source(data);
+ auto out =
+ IOBuf::create(snappy::MaxCompressedLength(source.Available()));
+
+ snappy::UncheckedByteArraySink sink(reinterpret_cast<char*>(
+ out->writableTail()));
+
+ size_t n = snappy::Compress(&source, &sink);
+
+ CHECK_LE(n, out->capacity());
+ out->append(n);
+ return out;
+}
+
+std::unique_ptr<IOBuf> SnappyCodec::doUncompress(const IOBuf* data,
+ uint64_t uncompressedLength) {
+ uint32_t actualUncompressedLength = 0;
+
+ {
+ IOBufSnappySource source(data);
+ if (!snappy::GetUncompressedLength(&source, &actualUncompressedLength)) {
+ throw std::runtime_error("snappy::GetUncompressedLength failed");
+ }
+ if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
+ uncompressedLength != actualUncompressedLength) {
+ throw std::runtime_error("snappy: invalid uncompressed length");
+ }
+ }
+
+ auto out = IOBuf::create(actualUncompressedLength);
+
+ {
+ IOBufSnappySource source(data);
+ if (!snappy::RawUncompress(&source,
+ reinterpret_cast<char*>(out->writableTail()))) {
+ throw std::runtime_error("snappy::RawUncompress failed");
+ }
+ }
+
+ out->append(actualUncompressedLength);
+ return out;
+}
+
+/**
+ * Zlib codec
+ */
+class ZlibCodec FOLLY_FINAL : public Codec {
+ public:
+ static std::unique_ptr<Codec> create(int level);
+ explicit ZlibCodec(int level);
+
+ private:
+ CodecType doType() const FOLLY_OVERRIDE;
+ std::unique_ptr<IOBuf> doCompress(const IOBuf* data) FOLLY_OVERRIDE;
+ std::unique_ptr<IOBuf> doUncompress(
+ const IOBuf* data,
+ uint64_t uncompressedLength) FOLLY_OVERRIDE;
+
+ std::unique_ptr<IOBuf> addOutputBuffer(z_stream* stream, uint32_t length);
+ bool doInflate(z_stream* stream, IOBuf* head, uint32_t bufferLength);
+
+ int level_;
+};
+
+std::unique_ptr<Codec> ZlibCodec::create(int level) {
+ return make_unique<ZlibCodec>(level);
+}
+
+ZlibCodec::ZlibCodec(int level) {
+ switch (level) {
+ case COMPRESSION_LEVEL_FASTEST:
+ level = 1;
+ break;
+ case COMPRESSION_LEVEL_DEFAULT:
+ level = Z_DEFAULT_COMPRESSION;
+ break;
+ case COMPRESSION_LEVEL_BEST:
+ level = 9;
+ break;
+ }
+ if (level != Z_DEFAULT_COMPRESSION && (level < 0 || level > 9)) {
+ throw std::invalid_argument(to<std::string>(
+ "ZlibCodec: invalid level: ", level));
+ }
+ level_ = level;
+}
+
+CodecType ZlibCodec::doType() const {
+ return CodecType::ZLIB;
+}
+
+std::unique_ptr<IOBuf> ZlibCodec::addOutputBuffer(z_stream* stream,
+ uint32_t length) {
+ CHECK_EQ(stream->avail_out, 0);
+
+ auto buf = IOBuf::create(length);
+ buf->append(length);
+
+ stream->next_out = buf->writableData();
+ stream->avail_out = buf->length();
+
+ return buf;
+}
+
+bool ZlibCodec::doInflate(z_stream* stream,
+ IOBuf* head,
+ uint32_t bufferLength) {
+ if (stream->avail_out == 0) {
+ head->prependChain(addOutputBuffer(stream, bufferLength));
+ }
+
+ int rc = inflate(stream, Z_NO_FLUSH);
+
+ switch (rc) {
+ case Z_OK:
+ break;
+ case Z_STREAM_END:
+ return true;
+ case Z_BUF_ERROR:
+ case Z_NEED_DICT:
+ case Z_DATA_ERROR:
+ case Z_MEM_ERROR:
+ throw std::runtime_error(to<std::string>(
+ "ZlibCodec: inflate error: ", rc, ": ", stream->msg));
+ default:
+ CHECK(false) << rc << ": " << stream->msg;
+ }
+
+ return false;
+}
+
+
+std::unique_ptr<IOBuf> ZlibCodec::doCompress(const IOBuf* data) {
+ z_stream stream;
+ stream.zalloc = nullptr;
+ stream.zfree = nullptr;
+ stream.opaque = nullptr;
+
+ int rc = deflateInit(&stream, level_);
+ if (rc != Z_OK) {
+ throw std::runtime_error(to<std::string>(
+ "ZlibCodec: deflateInit error: ", rc, ": ", stream.msg));
+ }
+
+ stream.next_in = stream.next_out = nullptr;
+ stream.avail_in = stream.avail_out = 0;
+ stream.total_in = stream.total_out = 0;
+
+ bool success = false;
+
+ SCOPE_EXIT {
+ int rc = deflateEnd(&stream);
+ // If we're here because of an exception, it's okay if some data
+ // got dropped.
+ CHECK(rc == Z_OK || (!success && rc == Z_DATA_ERROR))
+ << rc << ": " << stream.msg;
+ };
+
+ uint64_t uncompressedLength = data->computeChainDataLength();
+ uint64_t maxCompressedLength = deflateBound(&stream, uncompressedLength);
+
+ // Max 64MiB in one go
+ constexpr uint32_t maxSingleStepLength = uint32_t(64) << 20; // 64MiB
+ constexpr uint32_t defaultBufferLength = uint32_t(4) << 20; // 4MiB
+
+ auto out = addOutputBuffer(
+ &stream,
+ (maxCompressedLength <= maxSingleStepLength ?
+ maxCompressedLength :
+ defaultBufferLength));
+
+ for (auto& range : *data) {
+ if (range.empty()) {
+ continue;
+ }
+
+ stream.next_in = const_cast<uint8_t*>(range.data());
+ stream.avail_in = range.size();
+
+ while (stream.avail_in != 0) {
+ if (stream.avail_out == 0) {
+ out->prependChain(addOutputBuffer(&stream, defaultBufferLength));
+ }
+
+ rc = deflate(&stream, Z_NO_FLUSH);
+
+ CHECK_EQ(rc, Z_OK) << stream.msg;
+ }
+ }
+
+ do {
+ if (stream.avail_out == 0) {
+ out->prependChain(addOutputBuffer(&stream, defaultBufferLength));
+ }
+
+ rc = deflate(&stream, Z_FINISH);
+ } while (rc == Z_OK);
+
+ CHECK_EQ(rc, Z_STREAM_END) << stream.msg;
+
+ out->prev()->trimEnd(stream.avail_out);
+
+ success = true; // we survived
+
+ return out;
+}
+
+std::unique_ptr<IOBuf> ZlibCodec::doUncompress(const IOBuf* data,
+ uint64_t uncompressedLength) {
+ z_stream stream;
+ stream.zalloc = nullptr;
+ stream.zfree = nullptr;
+ stream.opaque = nullptr;
+
+ int rc = inflateInit(&stream);
+ if (rc != Z_OK) {
+ throw std::runtime_error(to<std::string>(
+ "ZlibCodec: inflateInit error: ", rc, ": ", stream.msg));
+ }
+
+ stream.next_in = stream.next_out = nullptr;
+ stream.avail_in = stream.avail_out = 0;
+ stream.total_in = stream.total_out = 0;
+
+ bool success = false;
+
+ SCOPE_EXIT {
+ int rc = inflateEnd(&stream);
+ // If we're here because of an exception, it's okay if some data
+ // got dropped.
+ CHECK(rc == Z_OK || (!success && rc == Z_DATA_ERROR))
+ << rc << ": " << stream.msg;
+ };
+
+ // Max 64MiB in one go
+ constexpr uint32_t maxSingleStepLength = uint32_t(64) << 20; // 64MiB
+ constexpr uint32_t defaultBufferLength = uint32_t(4) << 20; // 4MiB
+
+ auto out = addOutputBuffer(
+ &stream,
+ ((uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
+ uncompressedLength <= maxSingleStepLength) ?
+ uncompressedLength :
+ defaultBufferLength));
+
+ bool streamEnd = false;
+ for (auto& range : *data) {
+ if (range.empty()) {
+ continue;
+ }
+
+ stream.next_in = const_cast<uint8_t*>(range.data());
+ stream.avail_in = range.size();
+
+ while (stream.avail_in != 0) {
+ if (streamEnd) {
+ throw std::runtime_error(to<std::string>(
+ "ZlibCodec: junk after end of data"));
+ }
+
+ streamEnd = doInflate(&stream, out.get(), defaultBufferLength);
+ }
+ }
+
+ while (!streamEnd) {
+ streamEnd = doInflate(&stream, out.get(), defaultBufferLength);
+ }
+
+ out->prev()->trimEnd(stream.avail_out);
+
+ if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
+ uncompressedLength != stream.total_out) {
+ throw std::runtime_error(to<std::string>(
+ "ZlibCodec: invalid uncompressed length"));
+ }
+
+ success = true; // we survived
+
+ return out;
+}
+
+typedef std::unique_ptr<Codec> (*CodecFactory)(int);
+
+CodecFactory gCodecFactories[
+ static_cast<size_t>(CodecType::NUM_CODEC_TYPES)] = {
+ NoCompressionCodec::create,
+ LZ4Codec::create,
+ SnappyCodec::create,
+ ZlibCodec::create
+};
+
+} // namespace
+
+std::unique_ptr<Codec> getCodec(CodecType type, int level) {
+ size_t idx = static_cast<size_t>(type);
+ if (idx >= static_cast<size_t>(CodecType::NUM_CODEC_TYPES)) {
+ throw std::invalid_argument(to<std::string>(
+ "Compression type ", idx, " not supported"));
+ }
+ auto factory = gCodecFactories[idx];
+ if (!factory) {
+ throw std::invalid_argument(to<std::string>(
+ "Compression type ", idx, " not supported"));
+ }
+ auto codec = (*factory)(level);
+ DCHECK_EQ(static_cast<size_t>(codec->type()), idx);
+ return codec;
+}
+
+}} // namespaces
+
--- /dev/null
+/*
+ * Copyright 2013 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef FOLLY_IO_COMPRESSION_H_
+#define FOLLY_IO_COMPRESSION_H_
+
+#include <cstdint>
+#include <limits>
+#include <memory>
+
+#include "folly/io/IOBuf.h"
+
+/**
+ * Compression / decompression over IOBufs
+ */
+
+namespace folly { namespace io {
+
+enum class CodecType {
+ /**
+ * Use no compression.
+ * Levels supported: 0
+ */
+ NO_COMPRESSION = 0,
+
+ /**
+ * Use LZ4 compression.
+ * Levels supported: 1 = fast, 2 = best; default = 1
+ */
+ LZ4 = 1,
+
+ /**
+ * Use Snappy compression.
+ * Levels supported: 1
+ */
+ SNAPPY = 2,
+
+ /**
+ * Use zlib compression.
+ * Levels supported: 0 = no compression, 1 = fast, ..., 9 = best; default = 6
+ */
+ ZLIB = 3,
+
+ NUM_CODEC_TYPES = 4,
+};
+
+class Codec {
+ public:
+ virtual ~Codec() { }
+
+ /**
+ * Return the maximum length of data that may be compressed with this codec.
+ * NO_COMPRESSION and ZLIB support arbitrary lengths;
+ * LZ4 supports up to 1.9GiB; SNAPPY supports up to 4GiB.
+ */
+ uint64_t maxUncompressedLength() const;
+
+ /**
+ * Return the codec's type.
+ */
+ CodecType type() const;
+
+ /**
+ * Does this codec need the exact uncompressed length on decompression?
+ */
+ bool needsUncompressedLength() const;
+
+ /**
+ * Compress data, returning an IOBuf (which may share storage with data).
+ * Throws std::invalid_argument if data is larger than
+ * maxUncompressedLength().
+ *
+ * Regardless of the behavior of the underlying compressor, compressing
+ * an empty IOBuf chain will return an empty IOBuf chain.
+ */
+ std::unique_ptr<IOBuf> compress(const folly::IOBuf* data);
+
+ /**
+ * Uncompress data. Throws std::runtime_error on decompression error.
+ *
+ * Some codecs (LZ4) require the exact uncompressed length; this is indicated
+ * by needsUncompressedLength().
+ *
+ * For other codes (zlib), knowing the exact uncompressed length ahead of
+ * time might be faster.
+ *
+ * Regardless of the behavior of the underlying compressor, uncompressing
+ * an empty IOBuf chain will return an empty IOBuf chain.
+ */
+ static constexpr uint64_t UNKNOWN_UNCOMPRESSED_LENGTH = uint64_t(-1);
+
+ std::unique_ptr<IOBuf> uncompress(
+ const IOBuf* data,
+ uint64_t uncompressedLength = UNKNOWN_UNCOMPRESSED_LENGTH);
+
+ private:
+ // default: no limits (save for special value UNKNOWN_UNCOMPRESSED_LENGTH)
+ virtual uint64_t doMaxUncompressedLength() const;
+ // default: doesn't need uncompressed length
+ virtual bool doNeedsUncompressedLength() const;
+ virtual CodecType doType() const = 0;
+ virtual std::unique_ptr<IOBuf> doCompress(const folly::IOBuf* data) = 0;
+ virtual std::unique_ptr<IOBuf> doUncompress(const folly::IOBuf* data,
+ uint64_t uncompressedLength) = 0;
+};
+
+constexpr int COMPRESSION_LEVEL_FASTEST = -1;
+constexpr int COMPRESSION_LEVEL_DEFAULT = -2;
+constexpr int COMPRESSION_LEVEL_BEST = -3;
+
+/**
+ * Return a codec for the given type. Throws on error. The level
+ * is a non-negative codec-dependent integer indicating the level of
+ * compression desired, or one of the following constants:
+ *
+ * COMPRESSION_LEVEL_FASTEST is fastest (uses least CPU / memory,
+ * worst compression)
+ * COMPRESSION_LEVEL_DEFAULT is the default (likely a tradeoff between
+ * FASTEST and BEST)
+ * COMPRESSION_LEVEL_BEST is the best compression (uses most CPU / memory,
+ * best compression)
+ */
+std::unique_ptr<Codec> getCodec(CodecType type,
+ int level = COMPRESSION_LEVEL_DEFAULT);
+
+}} // namespaces
+
+#endif /* FOLLY_IO_COMPRESSION_H_ */
+
--- /dev/null
+/*
+ * Copyright 2013 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "folly/io/Compression.h"
+
+// Yes, tr1, as that's what gtest requires
+#include <random>
+#include <thread>
+#include <tr1/tuple>
+#include <unordered_map>
+
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "folly/Benchmark.h"
+#include "folly/Hash.h"
+#include "folly/Random.h"
+#include "folly/io/IOBufQueue.h"
+
+namespace folly { namespace io { namespace test {
+
+constexpr size_t randomDataSizeLog2 = 27; // 128MiB
+constexpr size_t randomDataSize = size_t(1) << randomDataSizeLog2;
+
+std::unique_ptr<uint8_t[]> randomData;
+std::unordered_map<uint64_t, uint64_t> hashes;
+
+uint64_t hashIOBuf(const IOBuf* buf) {
+ uint64_t h = folly::hash::FNV_64_HASH_START;
+ for (auto& range : *buf) {
+ h = folly::hash::fnv64_buf(range.data(), range.size(), h);
+ }
+ return h;
+}
+
+uint64_t getRandomDataHash(uint64_t size) {
+ auto p = hashes.find(size);
+ if (p != hashes.end()) {
+ return p->second;
+ }
+
+ uint64_t h = folly::hash::fnv64_buf(randomData.get(), size);
+ hashes[size] = h;
+ return h;
+}
+
+void generateRandomData() {
+ randomData.reset(new uint8_t[size_t(1) << randomDataSizeLog2]);
+
+ constexpr size_t numThreadsLog2 = 3;
+ constexpr size_t numThreads = size_t(1) << numThreadsLog2;
+
+ uint32_t seed = randomNumberSeed();
+
+ std::vector<std::thread> threads;
+ threads.reserve(numThreads);
+ for (size_t t = 0; t < numThreads; ++t) {
+ threads.emplace_back(
+ [seed, t, numThreadsLog2] () {
+ std::mt19937 rng(seed + t);
+ size_t countLog2 = size_t(1) << (randomDataSizeLog2 - numThreadsLog2);
+ size_t start = size_t(t) << countLog2;
+ for (size_t i = 0; i < countLog2; ++i) {
+ randomData[start + i] = rng();
+ }
+ });
+ }
+
+ for (auto& t : threads) {
+ t.join();
+ }
+}
+
+class CompressionTest : public testing::TestWithParam<
+ std::tr1::tuple<int, CodecType>> {
+ protected:
+ void SetUp() {
+ auto tup = GetParam();
+ uncompressedLength_ = uint64_t(1) << std::tr1::get<0>(tup);
+ codec_ = getCodec(std::tr1::get<1>(tup));
+ }
+
+ uint64_t uncompressedLength_;
+ std::unique_ptr<Codec> codec_;
+};
+
+TEST_P(CompressionTest, Simple) {
+ auto original = IOBuf::wrapBuffer(randomData.get(), uncompressedLength_);
+ auto compressed = codec_->compress(original.get());
+ if (!codec_->needsUncompressedLength()) {
+ auto uncompressed = codec_->uncompress(compressed.get());
+ EXPECT_EQ(uncompressedLength_, uncompressed->computeChainDataLength());
+ EXPECT_EQ(getRandomDataHash(uncompressedLength_),
+ hashIOBuf(uncompressed.get()));
+ }
+ {
+ auto uncompressed = codec_->uncompress(compressed.get(),
+ uncompressedLength_);
+ EXPECT_EQ(uncompressedLength_, uncompressed->computeChainDataLength());
+ EXPECT_EQ(getRandomDataHash(uncompressedLength_),
+ hashIOBuf(uncompressed.get()));
+ }
+}
+
+INSTANTIATE_TEST_CASE_P(
+ CompressionTest,
+ CompressionTest,
+ testing::Combine(
+ testing::Values(0, 1, 12, 22, int(randomDataSizeLog2)),
+ testing::Values(CodecType::NO_COMPRESSION,
+ CodecType::LZ4,
+ CodecType::SNAPPY,
+ CodecType::ZLIB)));
+
+class CompressionCorruptionTest : public testing::TestWithParam<CodecType> {
+ protected:
+ void SetUp() {
+ codec_ = getCodec(GetParam());
+ }
+
+ std::unique_ptr<Codec> codec_;
+};
+
+TEST_P(CompressionCorruptionTest, Simple) {
+ constexpr uint64_t uncompressedLength = 42;
+ auto original = IOBuf::wrapBuffer(randomData.get(), uncompressedLength);
+ auto compressed = codec_->compress(original.get());
+
+ if (!codec_->needsUncompressedLength()) {
+ auto uncompressed = codec_->uncompress(compressed.get());
+ EXPECT_EQ(uncompressedLength, uncompressed->computeChainDataLength());
+ EXPECT_EQ(getRandomDataHash(uncompressedLength),
+ hashIOBuf(uncompressed.get()));
+ }
+ {
+ auto uncompressed = codec_->uncompress(compressed.get(),
+ uncompressedLength);
+ EXPECT_EQ(uncompressedLength, uncompressed->computeChainDataLength());
+ EXPECT_EQ(getRandomDataHash(uncompressedLength),
+ hashIOBuf(uncompressed.get()));
+ }
+
+ EXPECT_THROW(codec_->uncompress(compressed.get(), uncompressedLength + 1),
+ std::runtime_error);
+
+ // Corrupt the first character
+ ++(compressed->writableData()[0]);
+
+ if (!codec_->needsUncompressedLength()) {
+ EXPECT_THROW(codec_->uncompress(compressed.get()),
+ std::runtime_error);
+ }
+
+ EXPECT_THROW(codec_->uncompress(compressed.get(), uncompressedLength),
+ std::runtime_error);
+}
+
+INSTANTIATE_TEST_CASE_P(
+ CompressionCorruptionTest,
+ CompressionCorruptionTest,
+ testing::Values(
+ // NO_COMPRESSION can't detect corruption
+ // LZ4 can't detect corruption reliably (sigh)
+ CodecType::SNAPPY,
+ CodecType::ZLIB));
+
+}}} // namespaces
+
+int main(int argc, char *argv[]) {
+ testing::InitGoogleTest(&argc, argv);
+ google::ParseCommandLineFlags(&argc, &argv, true);
+
+ folly::io::test::generateRandomData(); // 4GB
+
+ auto ret = RUN_ALL_TESTS();
+ if (!ret) {
+ folly::runBenchmarksOnFlag();
+ }
+ return ret;
+}
+