/*
- * Copyright 2017 Facebook, Inc.
+ * Copyright 2017-present Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
#pragma once
-#include <folly/Baton.h>
#include <folly/Function.h>
#include <folly/IndexedMemPool.h>
#include <folly/Portability.h>
-#include <folly/detail/CacheLocality.h>
+#include <folly/concurrency/CacheLocality.h>
+#include <folly/synchronization/SaturatingSemaphore.h>
#include <atomic>
#include <cassert>
#include <mutex>
+#include <thread>
namespace folly {
///
/// FC is an alternative to coarse-grained locking for making
/// sequential data structures thread-safe while minimizing the
-/// synchroniation overheads and cache coherence traffic associated
+/// synchronization overheads and cache coherence traffic associated
/// with locking.
///
/// Under FC, when a thread finds the lock contended, it can
/// and acquiring the lock are eliminated from the critical path of
/// operating on the data structure.
/// - Opportunities for smart combining, where executing multiple
-/// operations together may take less time than executng the
+/// operations together may take less time than executing the
/// operations separately, e.g., K delete_min operations on a
/// priority queue may be combined to take O(K + log N) time instead
/// of O(K * log N).
/// - A simple interface that requires minimal extra code by the
/// user. To use this interface efficiently the user-provided
-/// functions must be copyable to folly::Functio without dynamic
+/// functions must be copyable to folly::Function without dynamic
/// allocation. If this is impossible or inconvenient, the user is
/// encouraged to use the custom interface described below.
-/// - A custom interface that supports custom combinining and custom
+/// - A custom interface that supports custom combining and custom
/// request structure, either for the sake of smart combining or for
/// efficiently supporting operations that are not be copyable to
-/// folly::Function without synamic allocation.
+/// folly::Function without dynamic allocation.
/// - Both synchronous and asynchronous operations.
/// - Request records with and without thread-caching.
/// - Combining with and without a dedicated combiner thread.
/// class ConcurrentFoo : public FlatCombining<ConcurrentFoo> {
/// Foo foo_; // sequential data structure
/// public:
-/// T bar(V v) { // thread-safe execution of foo_.bar(v)
+/// T bar(V& v) { // thread-safe execution of foo_.bar(v)
/// T result;
/// // Note: fn must be copyable to folly::Function without dynamic
/// // allocation. Otherwise, it is recommended to use the custom
/// Combining request record.
class Rec {
FOLLY_ALIGN_TO_AVOID_FALSE_SHARING
- folly::Baton<Atom, true, false> valid_;
- folly::Baton<Atom, true, false> done_;
- folly::Baton<Atom, true, false> disconnected_;
+ folly::SaturatingSemaphore<false, Atom> valid_;
+ folly::SaturatingSemaphore<false, Atom> done_;
+ folly::SaturatingSemaphore<false, Atom> disconnected_;
size_t index_;
size_t next_;
uint64_t last_;
}
bool isValid() const {
- return valid_.try_wait();
+ return valid_.ready();
}
void setDone() {
}
bool isDone() const {
- return done_.try_wait();
+ return done_.ready();
}
void awaitDone() {
}
bool isDisconnected() const {
- return disconnected_.try_wait();
+ return disconnected_.ready();
}
void setIndex(const size_t index) {
}
};
- using Pool = folly::IndexedMemPool<Rec, 32, 4, Atom, false, false>;
+ using Pool = folly::
+ IndexedMemPool<Rec, 32, 4, Atom, IndexedMemPoolTraitsLazyRecycle<Rec>>;
public:
/// The constructor takes three optional arguments:
/// on the request records (if 0, then kDefaultMaxops)
explicit FlatCombining(
const bool dedicated = true,
- uint32_t numRecs = 0, // number of combining records
+ const uint32_t numRecs = 0, // number of combining records
const uint32_t maxOps = 0 // hint of max ops per combining session
)
: numRecs_(numRecs == 0 ? kDefaultNumRecs : numRecs),
m_.lock();
}
- // Give the caller exclusive access through a lock holder.
- // No need for explicit release.
- template <typename LockHolder>
- void acquireExclusive(LockHolder& l) {
- l = LockHolder(m_);
- }
-
// Try to give the caller exclusive access. Returns true iff successful.
bool tryExclusive() {
return m_.try_lock();
m_.unlock();
}
+ // Give the lock holder ownership of the mutex and exclusive access.
+ // No need for explicit release.
+ template <typename LockHolder>
+ void holdLock(LockHolder& l) {
+ l = LockHolder(m_);
+ }
+
+ // Give the caller's lock holder ownership of the mutex but without
+ // exclusive access. The caller can later use the lock holder to try
+ // to acquire exclusive access.
+ template <typename LockHolder>
+ void holdLock(LockHolder& l, std::defer_lock_t) {
+ l = LockHolder(m_, std::defer_lock);
+ }
+
// Execute an operation without combining
template <typename OpFunc>
void requestNoFC(OpFunc& opFn) {
Rec* allocRec() {
auto idx = recsPool_.allocIndex();
if (idx == NULL_INDEX) {
- outOfSpaceCount_.fetch_add(1);
return nullptr;
}
Rec& rec = recsPool_[idx];
recsPool_.recycleIndex(idx);
}
- // Returns a count of the number of combined operations so far.
- uint64_t getCombinedOpCount() {
- std::lock_guard<Mutex> guard(m_);
+ // Returns the number of uncombined operations so far.
+ uint64_t getNumUncombined() const {
+ return uncombined_;
+ }
+
+ // Returns the number of combined operations so far.
+ uint64_t getNumCombined() const {
return combined_;
}
- // Returns a count of the number of combining passes so far.
- uint64_t getCombiningPasses() {
- std::lock_guard<Mutex> guard(m_);
+ // Returns the number of combining passes so far.
+ uint64_t getNumPasses() const {
return passes_;
}
- uint64_t getOutOfSpaceCount() {
- return outOfSpaceCount_.load();
+ // Returns the number of combining sessions so far.
+ uint64_t getNumSessions() const {
+ return sessions_;
}
protected:
Mutex m_;
FOLLY_ALIGN_TO_AVOID_FALSE_SHARING
- folly::Baton<Atom, false, true> pending_;
+ folly::SaturatingSemaphore<true, Atom> pending_;
Atom<bool> shutdown_{false};
FOLLY_ALIGN_TO_AVOID_FALSE_SHARING
Pool recsPool_;
FOLLY_ALIGN_TO_AVOID_FALSE_SHARING
+ uint64_t uncombined_ = 0;
uint64_t combined_ = 0;
uint64_t passes_ = 0;
uint64_t sessions_ = 0;
- Atom<uint64_t> outOfSpaceCount_{0};
template <typename OpFunc, typename FillFunc, typename ResFn>
void requestOp(
std::unique_lock<Mutex> l(this->m_, std::defer_lock);
if (l.try_lock()) {
// No contention
+ ++uncombined_;
tryCombining();
opFn();
return;
if (rec == nullptr) {
// Can't use FC - Must acquire lock
l.lock();
+ ++uncombined_;
+ tryCombining();
opFn();
return;
}
}
bool isPending() const {
- return pending_.try_wait();
+ return pending_.ready();
}
void awaitPending() {
if (!dedicated_) {
while (isPending()) {
clearPending();
+ ++sessions_;
combined_ += combiningSession();
}
}
}
};
-} // namespace folly {
+} // namespace folly