Log (de)compression bytes
[folly.git] / folly / compression / Compression.cpp
index 4ad9f7956792f1e278fe5d73704222517fe34dc5..efcd7ed43f01555f414571604319310b41813723 100644 (file)
 #include <folly/Conv.h>
 #include <folly/Memory.h>
 #include <folly/Portability.h>
+#include <folly/Random.h>
 #include <folly/ScopeGuard.h>
 #include <folly/Varint.h>
 #include <folly/compression/Utils.h>
 #include <folly/io/Cursor.h>
 #include <folly/lang/Bits.h>
+#include <folly/stop_watch.h>
 #include <algorithm>
 #include <unordered_set>
 
@@ -65,19 +67,96 @@ using folly::io::compression::detail::prefixToStringLE;
 namespace folly {
 namespace io {
 
-Codec::Codec(CodecType type) : type_(type) { }
+Codec::Codec(
+    CodecType type,
+    Optional<int> level,
+    StringPiece name,
+    bool counters)
+    : type_(type) {
+  if (counters) {
+    bytesBeforeCompression_ = {type,
+                               name,
+                               level,
+                               CompressionCounterKey::BYTES_BEFORE_COMPRESSION,
+                               CompressionCounterType::SUM};
+    bytesAfterCompression_ = {type,
+                              name,
+                              level,
+                              CompressionCounterKey::BYTES_AFTER_COMPRESSION,
+                              CompressionCounterType::SUM};
+    bytesBeforeDecompression_ = {
+        type,
+        name,
+        level,
+        CompressionCounterKey::BYTES_BEFORE_DECOMPRESSION,
+        CompressionCounterType::SUM};
+    bytesAfterDecompression_ = {
+        type,
+        name,
+        level,
+        CompressionCounterKey::BYTES_AFTER_DECOMPRESSION,
+        CompressionCounterType::SUM};
+    compressions_ = {type,
+                     name,
+                     level,
+                     CompressionCounterKey::COMPRESSIONS,
+                     CompressionCounterType::SUM};
+    decompressions_ = {type,
+                       name,
+                       level,
+                       CompressionCounterKey::DECOMPRESSIONS,
+                       CompressionCounterType::SUM};
+    compressionMilliseconds_ = {type,
+                                name,
+                                level,
+                                CompressionCounterKey::COMPRESSION_MILLISECONDS,
+                                CompressionCounterType::SUM};
+    decompressionMilliseconds_ = {
+        type,
+        name,
+        level,
+        CompressionCounterKey::DECOMPRESSION_MILLISECONDS,
+        CompressionCounterType::SUM};
+  }
+}
+
+namespace {
+constexpr uint32_t kLoggingRate = 50;
+
+class Timer {
+ public:
+  explicit Timer(folly::detail::CompressionCounter& counter)
+      : counter_(&counter) {}
+
+  ~Timer() {
+    *counter_ += timer_.elapsed().count();
+  }
+
+ private:
+  folly::detail::CompressionCounter* counter_;
+  stop_watch<std::chrono::milliseconds> timer_;
+};
+} // namespace
 
 // Ensure consistent behavior in the nullptr case
 std::unique_ptr<IOBuf> Codec::compress(const IOBuf* data) {
   if (data == nullptr) {
     throw std::invalid_argument("Codec: data must not be nullptr");
   }
-  uint64_t len = data->computeChainDataLength();
+  const uint64_t len = data->computeChainDataLength();
   if (len > maxUncompressedLength()) {
     throw std::runtime_error("Codec: uncompressed length too large");
   }
-
-  return doCompress(data);
+  bool const logging = folly::Random::oneIn(kLoggingRate);
+  folly::Optional<Timer> const timer =
+      logging ? Timer(compressionMilliseconds_) : folly::Optional<Timer>();
+  auto result = doCompress(data);
+  if (logging) {
+    compressions_++;
+    bytesBeforeCompression_ += len;
+    bytesAfterCompression_ += result->computeChainDataLength();
+  }
+  return result;
 }
 
 std::string Codec::compress(const StringPiece data) {
@@ -85,8 +164,16 @@ std::string Codec::compress(const StringPiece data) {
   if (len > maxUncompressedLength()) {
     throw std::runtime_error("Codec: uncompressed length too large");
   }
-
-  return doCompressString(data);
+  bool const logging = folly::Random::oneIn(kLoggingRate);
+  folly::Optional<Timer> const timer =
+      logging ? Timer(compressionMilliseconds_) : folly::Optional<Timer>();
+  auto result = doCompressString(data);
+  if (logging) {
+    compressions_++;
+    bytesBeforeCompression_ += len;
+    bytesAfterCompression_ += result.size();
+  }
+  return result;
 }
 
 std::unique_ptr<IOBuf> Codec::uncompress(
@@ -110,7 +197,16 @@ std::unique_ptr<IOBuf> Codec::uncompress(
     return IOBuf::create(0);
   }
 
-  return doUncompress(data, uncompressedLength);
+  bool const logging = folly::Random::oneIn(kLoggingRate);
+  folly::Optional<Timer> const timer =
+      logging ? Timer(decompressionMilliseconds_) : folly::Optional<Timer>();
+  auto result = doUncompress(data, uncompressedLength);
+  if (logging) {
+    decompressions_++;
+    bytesBeforeDecompression_ += data->computeChainDataLength();
+    bytesAfterDecompression_ += result->computeChainDataLength();
+  }
+  return result;
 }
 
 std::string Codec::uncompress(
@@ -131,7 +227,16 @@ std::string Codec::uncompress(
     return "";
   }
 
-  return doUncompressString(data, uncompressedLength);
+  bool const logging = folly::Random::oneIn(kLoggingRate);
+  folly::Optional<Timer> const timer =
+      logging ? Timer(decompressionMilliseconds_) : folly::Optional<Timer>();
+  auto result = doUncompressString(data, uncompressedLength);
+  if (logging) {
+    decompressions_++;
+    bytesBeforeDecompression_ += data.size();
+    bytesAfterDecompression_ += result.size();
+  }
+  return result;
 }
 
 bool Codec::needsUncompressedLength() const {
@@ -551,23 +656,24 @@ std::unique_ptr<Codec> LZ4Codec::create(int level, CodecType type) {
   return std::make_unique<LZ4Codec>(level, type);
 }
 
-LZ4Codec::LZ4Codec(int level, CodecType type) : Codec(type) {
-  DCHECK(type == CodecType::LZ4 || type == CodecType::LZ4_VARINT_SIZE);
-
+static bool lz4ConvertLevel(int level) {
   switch (level) {
+    case 1:
     case COMPRESSION_LEVEL_FASTEST:
     case COMPRESSION_LEVEL_DEFAULT:
-      level = 1;
-      break;
+      return 1;
+    case 2:
     case COMPRESSION_LEVEL_BEST:
-      level = 2;
-      break;
-  }
-  if (level < 1 || level > 2) {
-    throw std::invalid_argument(to<std::string>(
-        "LZ4Codec: invalid level: ", level));
+      return 2;
   }
-  highCompression_ = (level > 1);
+  throw std::invalid_argument(
+      to<std::string>("LZ4Codec: invalid level: ", level));
+}
+
+LZ4Codec::LZ4Codec(int level, CodecType type)
+    : Codec(type, lz4ConvertLevel(level)),
+      highCompression_(lz4ConvertLevel(level) > 1) {
+  DCHECK(type == CodecType::LZ4 || type == CodecType::LZ4_VARINT_SIZE);
 }
 
 bool LZ4Codec::doNeedsUncompressedLength() const {
@@ -739,20 +845,20 @@ void LZ4FrameCodec::resetDCtx() {
   dirty_ = false;
 }
 
-LZ4FrameCodec::LZ4FrameCodec(int level, CodecType type) : Codec(type) {
-  DCHECK(type == CodecType::LZ4_FRAME);
+static int lz4fConvertLevel(int level) {
   switch (level) {
     case COMPRESSION_LEVEL_FASTEST:
     case COMPRESSION_LEVEL_DEFAULT:
-      level_ = 0;
-      break;
+      return 0;
     case COMPRESSION_LEVEL_BEST:
-      level_ = 16;
-      break;
-    default:
-      level_ = level;
-      break;
+      return 16;
   }
+  return level;
+}
+
+LZ4FrameCodec::LZ4FrameCodec(int level, CodecType type)
+    : Codec(type, lz4fConvertLevel(level)), level_(lz4fConvertLevel(level)) {
+  DCHECK(type == CodecType::LZ4_FRAME);
 }
 
 LZ4FrameCodec::~LZ4FrameCodec() {
@@ -1393,25 +1499,26 @@ std::unique_ptr<StreamCodec> ZSTDStreamCodec::createStream(
   return make_unique<ZSTDStreamCodec>(level, type);
 }
 
-ZSTDStreamCodec::ZSTDStreamCodec(int level, CodecType type)
-    : StreamCodec(type) {
-  DCHECK(type == CodecType::ZSTD);
+static int zstdConvertLevel(int level) {
   switch (level) {
     case COMPRESSION_LEVEL_FASTEST:
-      level = 1;
-      break;
+      return 1;
     case COMPRESSION_LEVEL_DEFAULT:
-      level = 1;
-      break;
+      return 1;
     case COMPRESSION_LEVEL_BEST:
-      level = 19;
-      break;
+      return 19;
   }
   if (level < 1 || level > ZSTD_maxCLevel()) {
     throw std::invalid_argument(
         to<std::string>("ZSTD: invalid level: ", level));
   }
-  level_ = level;
+  return level;
+}
+
+ZSTDStreamCodec::ZSTDStreamCodec(int level, CodecType type)
+    : StreamCodec(type, zstdConvertLevel(level)),
+      level_(zstdConvertLevel(level)) {
+  DCHECK(type == CodecType::ZSTD);
 }
 
 bool ZSTDStreamCodec::doNeedsUncompressedLength() const {
@@ -1910,7 +2017,7 @@ void AutomaticCodec::addCodecIfSupported(CodecType type) {
 AutomaticCodec::AutomaticCodec(
     std::vector<std::unique_ptr<Codec>> customCodecs,
     std::unique_ptr<Codec> terminalCodec)
-    : Codec(CodecType::USER_DEFINED),
+    : Codec(CodecType::USER_DEFINED, folly::none, "auto"),
       codecs_(std::move(customCodecs)),
       terminalCodec_(std::move(terminalCodec)) {
   // Fastest -> slowest