#include <folly/Conv.h>
#include <folly/Memory.h>
#include <folly/Portability.h>
+#include <folly/Random.h>
#include <folly/ScopeGuard.h>
#include <folly/Varint.h>
#include <folly/compression/Utils.h>
#include <folly/io/Cursor.h>
#include <folly/lang/Bits.h>
+#include <folly/stop_watch.h>
#include <algorithm>
#include <unordered_set>
namespace folly {
namespace io {
-Codec::Codec(CodecType type) : type_(type) { }
+Codec::Codec(
+ CodecType type,
+ Optional<int> level,
+ StringPiece name,
+ bool counters)
+ : type_(type) {
+ if (counters) {
+ bytesBeforeCompression_ = {type,
+ name,
+ level,
+ CompressionCounterKey::BYTES_BEFORE_COMPRESSION,
+ CompressionCounterType::SUM};
+ bytesAfterCompression_ = {type,
+ name,
+ level,
+ CompressionCounterKey::BYTES_AFTER_COMPRESSION,
+ CompressionCounterType::SUM};
+ bytesBeforeDecompression_ = {
+ type,
+ name,
+ level,
+ CompressionCounterKey::BYTES_BEFORE_DECOMPRESSION,
+ CompressionCounterType::SUM};
+ bytesAfterDecompression_ = {
+ type,
+ name,
+ level,
+ CompressionCounterKey::BYTES_AFTER_DECOMPRESSION,
+ CompressionCounterType::SUM};
+ compressions_ = {type,
+ name,
+ level,
+ CompressionCounterKey::COMPRESSIONS,
+ CompressionCounterType::SUM};
+ decompressions_ = {type,
+ name,
+ level,
+ CompressionCounterKey::DECOMPRESSIONS,
+ CompressionCounterType::SUM};
+ compressionMilliseconds_ = {type,
+ name,
+ level,
+ CompressionCounterKey::COMPRESSION_MILLISECONDS,
+ CompressionCounterType::SUM};
+ decompressionMilliseconds_ = {
+ type,
+ name,
+ level,
+ CompressionCounterKey::DECOMPRESSION_MILLISECONDS,
+ CompressionCounterType::SUM};
+ }
+}
+
+namespace {
+constexpr uint32_t kLoggingRate = 50;
+
+class Timer {
+ public:
+ explicit Timer(folly::detail::CompressionCounter& counter)
+ : counter_(&counter) {}
+
+ ~Timer() {
+ *counter_ += timer_.elapsed().count();
+ }
+
+ private:
+ folly::detail::CompressionCounter* counter_;
+ stop_watch<std::chrono::milliseconds> timer_;
+};
+} // namespace
// Ensure consistent behavior in the nullptr case
std::unique_ptr<IOBuf> Codec::compress(const IOBuf* data) {
if (data == nullptr) {
throw std::invalid_argument("Codec: data must not be nullptr");
}
- uint64_t len = data->computeChainDataLength();
+ const uint64_t len = data->computeChainDataLength();
if (len > maxUncompressedLength()) {
throw std::runtime_error("Codec: uncompressed length too large");
}
-
- return doCompress(data);
+ bool const logging = folly::Random::oneIn(kLoggingRate);
+ folly::Optional<Timer> const timer =
+ logging ? Timer(compressionMilliseconds_) : folly::Optional<Timer>();
+ auto result = doCompress(data);
+ if (logging) {
+ compressions_++;
+ bytesBeforeCompression_ += len;
+ bytesAfterCompression_ += result->computeChainDataLength();
+ }
+ return result;
}
std::string Codec::compress(const StringPiece data) {
if (len > maxUncompressedLength()) {
throw std::runtime_error("Codec: uncompressed length too large");
}
-
- return doCompressString(data);
+ bool const logging = folly::Random::oneIn(kLoggingRate);
+ folly::Optional<Timer> const timer =
+ logging ? Timer(compressionMilliseconds_) : folly::Optional<Timer>();
+ auto result = doCompressString(data);
+ if (logging) {
+ compressions_++;
+ bytesBeforeCompression_ += len;
+ bytesAfterCompression_ += result.size();
+ }
+ return result;
}
std::unique_ptr<IOBuf> Codec::uncompress(
return IOBuf::create(0);
}
- return doUncompress(data, uncompressedLength);
+ bool const logging = folly::Random::oneIn(kLoggingRate);
+ folly::Optional<Timer> const timer =
+ logging ? Timer(decompressionMilliseconds_) : folly::Optional<Timer>();
+ auto result = doUncompress(data, uncompressedLength);
+ if (logging) {
+ decompressions_++;
+ bytesBeforeDecompression_ += data->computeChainDataLength();
+ bytesAfterDecompression_ += result->computeChainDataLength();
+ }
+ return result;
}
std::string Codec::uncompress(
return "";
}
- return doUncompressString(data, uncompressedLength);
+ bool const logging = folly::Random::oneIn(kLoggingRate);
+ folly::Optional<Timer> const timer =
+ logging ? Timer(decompressionMilliseconds_) : folly::Optional<Timer>();
+ auto result = doUncompressString(data, uncompressedLength);
+ if (logging) {
+ decompressions_++;
+ bytesBeforeDecompression_ += data.size();
+ bytesAfterDecompression_ += result.size();
+ }
+ return result;
}
bool Codec::needsUncompressedLength() const {
return std::make_unique<LZ4Codec>(level, type);
}
-LZ4Codec::LZ4Codec(int level, CodecType type) : Codec(type) {
- DCHECK(type == CodecType::LZ4 || type == CodecType::LZ4_VARINT_SIZE);
-
+static bool lz4ConvertLevel(int level) {
switch (level) {
+ case 1:
case COMPRESSION_LEVEL_FASTEST:
case COMPRESSION_LEVEL_DEFAULT:
- level = 1;
- break;
+ return 1;
+ case 2:
case COMPRESSION_LEVEL_BEST:
- level = 2;
- break;
- }
- if (level < 1 || level > 2) {
- throw std::invalid_argument(to<std::string>(
- "LZ4Codec: invalid level: ", level));
+ return 2;
}
- highCompression_ = (level > 1);
+ throw std::invalid_argument(
+ to<std::string>("LZ4Codec: invalid level: ", level));
+}
+
+LZ4Codec::LZ4Codec(int level, CodecType type)
+ : Codec(type, lz4ConvertLevel(level)),
+ highCompression_(lz4ConvertLevel(level) > 1) {
+ DCHECK(type == CodecType::LZ4 || type == CodecType::LZ4_VARINT_SIZE);
}
bool LZ4Codec::doNeedsUncompressedLength() const {
dirty_ = false;
}
-LZ4FrameCodec::LZ4FrameCodec(int level, CodecType type) : Codec(type) {
- DCHECK(type == CodecType::LZ4_FRAME);
+static int lz4fConvertLevel(int level) {
switch (level) {
case COMPRESSION_LEVEL_FASTEST:
case COMPRESSION_LEVEL_DEFAULT:
- level_ = 0;
- break;
+ return 0;
case COMPRESSION_LEVEL_BEST:
- level_ = 16;
- break;
- default:
- level_ = level;
- break;
+ return 16;
}
+ return level;
+}
+
+LZ4FrameCodec::LZ4FrameCodec(int level, CodecType type)
+ : Codec(type, lz4fConvertLevel(level)), level_(lz4fConvertLevel(level)) {
+ DCHECK(type == CodecType::LZ4_FRAME);
}
LZ4FrameCodec::~LZ4FrameCodec() {
return make_unique<ZSTDStreamCodec>(level, type);
}
-ZSTDStreamCodec::ZSTDStreamCodec(int level, CodecType type)
- : StreamCodec(type) {
- DCHECK(type == CodecType::ZSTD);
+static int zstdConvertLevel(int level) {
switch (level) {
case COMPRESSION_LEVEL_FASTEST:
- level = 1;
- break;
+ return 1;
case COMPRESSION_LEVEL_DEFAULT:
- level = 1;
- break;
+ return 1;
case COMPRESSION_LEVEL_BEST:
- level = 19;
- break;
+ return 19;
}
if (level < 1 || level > ZSTD_maxCLevel()) {
throw std::invalid_argument(
to<std::string>("ZSTD: invalid level: ", level));
}
- level_ = level;
+ return level;
+}
+
+ZSTDStreamCodec::ZSTDStreamCodec(int level, CodecType type)
+ : StreamCodec(type, zstdConvertLevel(level)),
+ level_(zstdConvertLevel(level)) {
+ DCHECK(type == CodecType::ZSTD);
}
bool ZSTDStreamCodec::doNeedsUncompressedLength() const {
AutomaticCodec::AutomaticCodec(
std::vector<std::unique_ptr<Codec>> customCodecs,
std::unique_ptr<Codec> terminalCodec)
- : Codec(CodecType::USER_DEFINED),
+ : Codec(CodecType::USER_DEFINED, folly::none, "auto"),
codecs_(std::move(customCodecs)),
terminalCodec_(std::move(terminalCodec)) {
// Fastest -> slowest