FlatCombining: Use SaturatingSemaphore instead of multi-poster and non-blocking Baton
[folly.git] / folly / experimental / flat_combining / FlatCombining.h
index 445d29a30ae9ee0e3c66aa9a378681a18d4aa3ee..bf19e235b5927aa53e819a4c2404e4f4605be60b 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright 2017 Facebook, Inc.
+ * Copyright 2017-present Facebook, Inc.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
 
 #pragma once
 
-#include <folly/Baton.h>
 #include <folly/Function.h>
 #include <folly/IndexedMemPool.h>
 #include <folly/Portability.h>
-#include <folly/detail/CacheLocality.h>
+#include <folly/concurrency/CacheLocality.h>
+#include <folly/synchronization/SaturatingSemaphore.h>
 
 #include <atomic>
 #include <cassert>
 #include <mutex>
+#include <thread>
 
 namespace folly {
 
@@ -35,7 +36,7 @@ namespace folly {
 ///
 /// FC is an alternative to coarse-grained locking for making
 /// sequential data structures thread-safe while minimizing the
-/// synchroniation overheads and cache coherence traffic associated
+/// synchronization overheads and cache coherence traffic associated
 /// with locking.
 ///
 /// Under FC, when a thread finds the lock contended, it can
@@ -50,7 +51,7 @@ namespace folly {
 ///   and acquiring the lock are eliminated from the critical path of
 ///   operating on the data structure.
 /// - Opportunities for smart combining, where executing multiple
-///   operations together may take less time than executng the
+///   operations together may take less time than executing the
 ///   operations separately, e.g., K delete_min operations on a
 ///   priority queue may be combined to take O(K + log N) time instead
 ///   of O(K * log N).
@@ -59,13 +60,13 @@ namespace folly {
 
 /// - A simple interface that requires minimal extra code by the
 ///   user. To use this interface efficiently the user-provided
-///   functions must be copyable to folly::Functio without dynamic
+///   functions must be copyable to folly::Function without dynamic
 ///   allocation. If this is impossible or inconvenient, the user is
 ///   encouraged to use the custom interface described below.
-/// - A custom interface that supports custom combinining and custom
+/// - A custom interface that supports custom combining and custom
 ///   request structure, either for the sake of smart combining or for
 ///   efficiently supporting operations that are not be copyable to
-///   folly::Function without synamic allocation.
+///   folly::Function without dynamic allocation.
 /// - Both synchronous and asynchronous operations.
 /// - Request records with and without thread-caching.
 /// - Combining with and without a dedicated combiner thread.
@@ -84,7 +85,7 @@ namespace folly {
 ///   class ConcurrentFoo : public FlatCombining<ConcurrentFoo> {
 ///     Foo foo_; // sequential data structure
 ///    public:
-///     T bar(V v) { // thread-safe execution of foo_.bar(v)
+///     T bar(V& v) { // thread-safe execution of foo_.bar(v)
 ///       T result;
 ///       // Note: fn must be copyable to folly::Function without dynamic
 ///       // allocation. Otherwise, it is recommended to use the custom
@@ -112,9 +113,9 @@ class FlatCombining {
   /// Combining request record.
   class Rec {
     FOLLY_ALIGN_TO_AVOID_FALSE_SHARING
-    folly::Baton<Atom, true, false> valid_;
-    folly::Baton<Atom, true, false> done_;
-    folly::Baton<Atom, true, false> disconnected_;
+    folly::SaturatingSemaphore<false, Atom> valid_;
+    folly::SaturatingSemaphore<false, Atom> done_;
+    folly::SaturatingSemaphore<false, Atom> disconnected_;
     size_t index_;
     size_t next_;
     uint64_t last_;
@@ -136,7 +137,7 @@ class FlatCombining {
     }
 
     bool isValid() const {
-      return valid_.try_wait();
+      return valid_.ready();
     }
 
     void setDone() {
@@ -148,7 +149,7 @@ class FlatCombining {
     }
 
     bool isDone() const {
-      return done_.try_wait();
+      return done_.ready();
     }
 
     void awaitDone() {
@@ -164,7 +165,7 @@ class FlatCombining {
     }
 
     bool isDisconnected() const {
-      return disconnected_.try_wait();
+      return disconnected_.ready();
     }
 
     void setIndex(const size_t index) {
@@ -225,7 +226,8 @@ class FlatCombining {
     }
   };
 
-  using Pool = folly::IndexedMemPool<Rec, 32, 4, Atom, false, false>;
+  using Pool = folly::
+      IndexedMemPool<Rec, 32, 4, Atom, IndexedMemPoolTraitsLazyRecycle<Rec>>;
 
  public:
   /// The constructor takes three optional arguments:
@@ -236,7 +238,7 @@ class FlatCombining {
   ///   on the request records (if 0, then kDefaultMaxops)
   explicit FlatCombining(
       const bool dedicated = true,
-      uint32_t numRecs = 0, // number of combining records
+      const uint32_t numRecs = 0, // number of combining records
       const uint32_t maxOps = 0 // hint of max ops per combining session
       )
       : numRecs_(numRecs == 0 ? kDefaultNumRecs : numRecs),
@@ -277,13 +279,6 @@ class FlatCombining {
     m_.lock();
   }
 
-  // Give the caller exclusive access through a lock holder.
-  // No need for explicit release.
-  template <typename LockHolder>
-  void acquireExclusive(LockHolder& l) {
-    l = LockHolder(m_);
-  }
-
   // Try to give the caller exclusive access. Returns true iff successful.
   bool tryExclusive() {
     return m_.try_lock();
@@ -294,6 +289,21 @@ class FlatCombining {
     m_.unlock();
   }
 
+  // Give the lock holder ownership of the mutex and exclusive access.
+  // No need for explicit release.
+  template <typename LockHolder>
+  void holdLock(LockHolder& l) {
+    l = LockHolder(m_);
+  }
+
+  // Give the caller's lock holder ownership of the mutex but without
+  // exclusive access. The caller can later use the lock holder to try
+  // to acquire exclusive access.
+  template <typename LockHolder>
+  void holdLock(LockHolder& l, std::defer_lock_t) {
+    l = LockHolder(m_, std::defer_lock);
+  }
+
   // Execute an operation without combining
   template <typename OpFunc>
   void requestNoFC(OpFunc& opFn) {
@@ -369,7 +379,6 @@ class FlatCombining {
   Rec* allocRec() {
     auto idx = recsPool_.allocIndex();
     if (idx == NULL_INDEX) {
-      outOfSpaceCount_.fetch_add(1);
       return nullptr;
     }
     Rec& rec = recsPool_[idx];
@@ -386,20 +395,24 @@ class FlatCombining {
     recsPool_.recycleIndex(idx);
   }
 
-  // Returns a count of the number of combined operations so far.
-  uint64_t getCombinedOpCount() {
-    std::lock_guard<Mutex> guard(m_);
+  // Returns the number of uncombined operations so far.
+  uint64_t getNumUncombined() const {
+    return uncombined_;
+  }
+
+  // Returns the number of combined operations so far.
+  uint64_t getNumCombined() const {
     return combined_;
   }
 
-  // Returns a count of the number of combining passes so far.
-  uint64_t getCombiningPasses() {
-    std::lock_guard<Mutex> guard(m_);
+  // Returns the number of combining passes so far.
+  uint64_t getNumPasses() const {
     return passes_;
   }
 
-  uint64_t getOutOfSpaceCount() {
-    return outOfSpaceCount_.load();
+  // Returns the number of combining sessions so far.
+  uint64_t getNumSessions() const {
+    return sessions_;
   }
 
  protected:
@@ -412,7 +425,7 @@ class FlatCombining {
   Mutex m_;
 
   FOLLY_ALIGN_TO_AVOID_FALSE_SHARING
-  folly::Baton<Atom, false, true> pending_;
+  folly::SaturatingSemaphore<true, Atom> pending_;
   Atom<bool> shutdown_{false};
 
   FOLLY_ALIGN_TO_AVOID_FALSE_SHARING
@@ -424,10 +437,10 @@ class FlatCombining {
   Pool recsPool_;
 
   FOLLY_ALIGN_TO_AVOID_FALSE_SHARING
+  uint64_t uncombined_ = 0;
   uint64_t combined_ = 0;
   uint64_t passes_ = 0;
   uint64_t sessions_ = 0;
-  Atom<uint64_t> outOfSpaceCount_{0};
 
   template <typename OpFunc, typename FillFunc, typename ResFn>
   void requestOp(
@@ -440,6 +453,7 @@ class FlatCombining {
     std::unique_lock<Mutex> l(this->m_, std::defer_lock);
     if (l.try_lock()) {
       // No contention
+      ++uncombined_;
       tryCombining();
       opFn();
       return;
@@ -456,6 +470,8 @@ class FlatCombining {
     if (rec == nullptr) {
       // Can't use FC - Must acquire lock
       l.lock();
+      ++uncombined_;
+      tryCombining();
       opFn();
       return;
     }
@@ -526,7 +542,7 @@ class FlatCombining {
   }
 
   bool isPending() const {
-    return pending_.try_wait();
+    return pending_.ready();
   }
 
   void awaitPending() {
@@ -550,6 +566,7 @@ class FlatCombining {
     if (!dedicated_) {
       while (isPending()) {
         clearPending();
+        ++sessions_;
         combined_ += combiningSession();
       }
     }
@@ -663,4 +680,4 @@ class FlatCombining {
   }
 };
 
-} // namespace folly {
+} // namespace folly