2 * Copyright 2017-present Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
19 #include <folly/Function.h>
20 #include <folly/IndexedMemPool.h>
21 #include <folly/Portability.h>
22 #include <folly/concurrency/CacheLocality.h>
23 #include <folly/synchronization/SaturatingSemaphore.h>
32 /// Flat combining (FC) was introduced in the SPAA 2010 paper Flat
33 /// Combining and the Synchronization-Parallelism Tradeoff, by Danny
34 /// Hendler, Itai Incze, Nir Shavit, and Moran Tzafrir.
35 /// http://mcg.cs.tau.ac.il/projects/projects/flat-combining
37 /// FC is an alternative to coarse-grained locking for making
38 /// sequential data structures thread-safe while minimizing the
39 /// synchronization overheads and cache coherence traffic associated
42 /// Under FC, when a thread finds the lock contended, it can
43 /// request (using a request record) that the lock holder execute its
44 /// operation on the shared data structure. There can be a designated
45 /// combiner thread or any thread can act as the combiner when it
48 /// Potential advantages of FC include:
49 /// - Reduced cache coherence traffic
50 /// - Reduced synchronization overheads, as the overheads of releasing
51 /// and acquiring the lock are eliminated from the critical path of
52 /// operating on the data structure.
53 /// - Opportunities for smart combining, where executing multiple
54 /// operations together may take less time than executing the
55 /// operations separately, e.g., K delete_min operations on a
56 /// priority queue may be combined to take O(K + log N) time instead
59 /// This implementation of flat combining supports:
61 /// - A simple interface that requires minimal extra code by the
62 /// user. To use this interface efficiently the user-provided
63 /// functions must be copyable to folly::Function without dynamic
64 /// allocation. If this is impossible or inconvenient, the user is
65 /// encouraged to use the custom interface described below.
66 /// - A custom interface that supports custom combining and custom
67 /// request structure, either for the sake of smart combining or for
68 /// efficiently supporting operations that are not be copyable to
69 /// folly::Function without dynamic allocation.
70 /// - Both synchronous and asynchronous operations.
71 /// - Request records with and without thread-caching.
72 /// - Combining with and without a dedicated combiner thread.
74 /// This implementation differs from the algorithm in the SPAA 2010 paper:
75 /// - It does not require thread caching of request records
76 /// - It supports a dedicated combiner
77 /// - It supports asynchronous operations
79 /// The generic FC class template supports generic data structures and
80 /// utilities with arbitrary operations. The template supports static
81 /// polymorphism for the combining function to enable custom smart
84 /// A simple example of using the FC template:
85 /// class ConcurrentFoo : public FlatCombining<ConcurrentFoo> {
86 /// Foo foo_; // sequential data structure
88 /// T bar(V& v) { // thread-safe execution of foo_.bar(v)
90 /// // Note: fn must be copyable to folly::Function without dynamic
91 /// // allocation. Otherwise, it is recommended to use the custom
92 /// // interface and manage the function arguments and results
93 /// // explicitly in a custom request structure.
94 /// auto fn = [&] { result = foo_.bar(v); };
95 /// this->requestFC(fn);
100 /// See test/FlatCombiningExamples.h for more examples. See the
101 /// comments for requestFC() below for a list of simple and custom
102 /// variants of that function.
105 typename T, // concurrent data structure using FC interface
106 typename Mutex = std::mutex,
107 template <typename> class Atom = std::atomic,
108 typename Req = /* default dummy type */ bool>
109 class FlatCombining {
110 using SavedFn = folly::Function<void()>;
113 /// Combining request record.
115 alignas(hardware_destructive_interference_size)
116 folly::SaturatingSemaphore<false, Atom> valid_;
117 folly::SaturatingSemaphore<false, Atom> done_;
118 folly::SaturatingSemaphore<false, Atom> disconnected_;
139 bool isValid() const {
140 return valid_.ready();
151 bool isDone() const {
152 return done_.ready();
159 void setDisconnected() {
160 disconnected_.post();
163 void clearDisconnected() {
164 disconnected_.reset();
167 bool isDisconnected() const {
168 return disconnected_.ready();
171 void setIndex(const size_t index) {
175 size_t getIndex() const {
179 void setNext(const size_t next) {
183 size_t getNext() const {
187 void setLast(const uint64_t pass) {
191 uint64_t getLast() const {
199 template <typename Func>
200 void setFn(Func&& fn) {
202 std::is_nothrow_constructible<
203 folly::Function<void()>,
204 _t<std::decay<Func>>>::value,
205 "Try using a smaller function object that can fit in folly::Function "
206 "without allocation, or use the custom interface of requestFC() to "
207 "manage the requested function's arguments and results explicitly "
208 "in a custom request structure without allocation.");
209 fn_ = std::forward<Func>(fn);
230 IndexedMemPool<Rec, 32, 4, Atom, IndexedMemPoolTraitsLazyRecycle<Rec>>;
233 /// The constructor takes three optional arguments:
234 /// - Optional dedicated combiner thread (default true)
235 /// - Number of records (if 0, then kDefaultNumRecs)
236 /// - A hint for the max. number of combined operations per
237 /// combining session that is checked at the beginning of each pass
238 /// on the request records (if 0, then kDefaultMaxops)
239 explicit FlatCombining(
240 const bool dedicated = true,
241 const uint32_t numRecs = 0, // number of combining records
242 const uint32_t maxOps = 0 // hint of max ops per combining session
244 : numRecs_(numRecs == 0 ? kDefaultNumRecs : numRecs),
245 maxOps_(maxOps == 0 ? kDefaultMaxOps : maxOps),
247 dedicated_(dedicated),
248 recsPool_(numRecs_) {
250 // dedicated combiner thread
251 combiner_ = std::thread([this] { dedicatedCombining(); });
255 /// Destructor: If there is a dedicated combiner, the destructor
256 /// flags it to shutdown. Otherwise, the destructor waits for all
257 /// pending asynchronous requests to be completed.
267 // Wait for all pending operations to complete. Useful primarily
268 // when there are asynchronous operations without a dedicated
271 for (size_t i = getRecsHead(); i != NULL_INDEX; i = nextIndex(i)) {
272 Rec& rec = recsPool_[i];
277 // Give the caller exclusive access.
278 void acquireExclusive() {
282 // Try to give the caller exclusive access. Returns true iff successful.
283 bool tryExclusive() {
284 return m_.try_lock();
287 // Release exclusive access. The caller must have exclusive access.
288 void releaseExclusive() {
292 // Give the lock holder ownership of the mutex and exclusive access.
293 // No need for explicit release.
294 template <typename LockHolder>
295 void holdLock(LockHolder& l) {
299 // Give the caller's lock holder ownership of the mutex but without
300 // exclusive access. The caller can later use the lock holder to try
301 // to acquire exclusive access.
302 template <typename LockHolder>
303 void holdLock(LockHolder& l, std::defer_lock_t) {
304 l = LockHolder(m_, std::defer_lock);
307 // Execute an operation without combining
308 template <typename OpFunc>
309 void requestNoFC(OpFunc& opFn) {
310 std::lock_guard<Mutex> guard(m_);
314 // This function first tries to execute the operation without
315 // combining. If unuccessful, it allocates a combining record if
316 // needed. If there are no available records, it waits for exclusive
317 // access and executes the operation. If a record is available and
318 // ready for use, it fills the record and indicates that the request
319 // is valid for combining. If the request is synchronous (by default
320 // or necessity), it waits for the operation to be completed by a
321 // combiner and optionally extracts the result, if any.
323 // This function can be called in several forms:
324 // Simple forms that do not require the user to define a Req structure
325 // or to override any request processing member functions:
327 // requestFC(opFn, rec) // provides its own pre-allocated record
328 // requestFC(opFn, rec, syncop) // asynchronous if syncop == false
329 // Custom forms that require the user to define a Req structure and to
330 // override some request processing member functions:
331 // requestFC(opFn, fillFn)
332 // requestFC(opFn, fillFn, rec)
333 // requestFC(opFn, fillFn, rec, syncop)
334 // requestFC(opFn, fillFn, resFn)
335 // requestFC(opFn, fillFn, resFn, rec)
336 template <typename OpFunc>
337 void requestFC(OpFunc&& opFn, Rec* rec = nullptr, bool syncop = true) {
338 auto dummy = [](Req&) {};
340 std::forward<OpFunc>(opFn),
347 template <typename OpFunc, typename FillFunc>
350 const FillFunc& fillFn,
352 bool syncop = true) {
353 auto dummy = [](Req&) {};
355 std::forward<OpFunc>(opFn),
362 template <typename OpFunc, typename FillFunc, typename ResFn>
365 const FillFunc& fillFn,
367 Rec* rec = nullptr) {
368 // must wait for result to execute resFn -- so it must be synchronous
370 std::forward<OpFunc>(opFn),
378 // Allocate a record.
380 auto idx = recsPool_.allocIndex();
381 if (idx == NULL_INDEX) {
384 Rec& rec = recsPool_[idx];
390 void freeRec(Rec* rec) {
391 if (rec == nullptr) {
394 auto idx = rec->getIndex();
395 recsPool_.recycleIndex(idx);
398 // Returns the number of uncombined operations so far.
399 uint64_t getNumUncombined() const {
403 // Returns the number of combined operations so far.
404 uint64_t getNumCombined() const {
408 // Returns the number of combining passes so far.
409 uint64_t getNumPasses() const {
413 // Returns the number of combining sessions so far.
414 uint64_t getNumSessions() const {
419 const size_t NULL_INDEX = 0;
420 const uint32_t kDefaultMaxOps = 100;
421 const uint64_t kDefaultNumRecs = 64;
422 const uint64_t kIdleThreshold = 10;
424 alignas(hardware_destructive_interference_size) Mutex m_;
426 alignas(hardware_destructive_interference_size)
427 folly::SaturatingSemaphore<true, Atom> pending_;
428 Atom<bool> shutdown_{false};
430 alignas(hardware_destructive_interference_size) uint32_t numRecs_;
434 std::thread combiner_;
437 alignas(hardware_destructive_interference_size) uint64_t uncombined_ = 0;
438 uint64_t combined_ = 0;
439 uint64_t passes_ = 0;
440 uint64_t sessions_ = 0;
442 template <typename OpFunc, typename FillFunc, typename ResFn>
445 const FillFunc& fillFn,
450 std::unique_lock<Mutex> l(this->m_, std::defer_lock);
460 bool tc = (rec != nullptr);
462 // if an async op doesn't have a thread-cached record then turn
463 // it into a synchronous op.
467 if (rec == nullptr) {
468 // Can't use FC - Must acquire lock
477 // Wait if record is in use
482 // Fill the request (custom)
483 Req& req = rec->getReq();
487 rec->setFn(std::forward<OpFunc>(opFn));
489 // Indicate that record is valid
490 assert(!rec->isValid());
492 // end of combining critical path
494 // store-load order setValid before isDisconnected
495 std::atomic_thread_fence(std::memory_order_seq_cst);
496 if (rec->isDisconnected()) {
497 rec->clearDisconnected();
498 pushRec(rec->getIndex());
501 // If synchronous wait for the request to be completed
505 Req& req = rec->getReq();
506 resFn(req); // Extract the result (custom)
509 freeRec(rec); // Free the temporary record.
514 void pushRec(size_t idx) {
515 Rec& rec = recsPool_[idx];
517 auto head = recs_.load(std::memory_order_acquire);
518 rec.setNext(head); // there shouldn't be a data race here
519 if (recs_.compare_exchange_weak(head, idx)) {
525 size_t getRecsHead() {
526 return recs_.load(std::memory_order_acquire);
529 size_t nextIndex(size_t idx) {
530 return recsPool_[idx].getNext();
533 void clearPending() {
541 bool isPending() const {
542 return pending_.ready();
545 void awaitPending() {
549 uint64_t combiningSession() {
550 uint64_t combined = 0;
552 uint64_t count = static_cast<T*>(this)->combiningPass();
558 } while (combined < this->maxOps_);
562 void tryCombining() {
564 while (isPending()) {
567 combined_ += combiningSession();
572 void dedicatedCombining() {
576 if (shutdown_.load()) {
583 std::lock_guard<Mutex> guard(m_);
584 count = combiningSession();
587 if (count < maxOps_) {
594 void awaitDone(Rec& rec) {
598 awaitDoneTryLock(rec);
602 /// Waits for the request to be done and occasionally tries to
603 /// acquire the lock and to do combining. Used only in the absence
604 /// of a dedicated combiner.
605 void awaitDoneTryLock(Rec& rec) {
608 while (!rec.isDone()) {
610 std::unique_lock<Mutex> l(m_, std::defer_lock);
616 folly::asm_volatile_pause();
617 if (++count == 1000) {
625 shutdown_.store(true);
629 /// The following member functions may be overridden for customization
631 void combinedOp(Req&) {
632 throw std::runtime_error(
633 "FlatCombining::combinedOp(Req&) must be overridden in the derived"
634 " class if called.");
637 void processReq(Rec& rec) {
638 SavedFn& opFn = rec.getFn();
644 Req& req = rec.getReq();
645 static_cast<T*>(this)->combinedOp(req); // defined in derived class
647 rec.setLast(passes_);
651 uint64_t combiningPass() {
653 auto idx = getRecsHead();
655 while (idx != NULL_INDEX) {
656 Rec& rec = recsPool_[idx];
657 auto next = rec.getNext();
658 bool valid = rec.isValid();
659 if (!valid && (passes_ - rec.getLast() > kIdleThreshold) &&
663 rec.setDisconnected();
664 // store-load order setDisconnected before isValid
665 std::atomic_thread_fence(std::memory_order_seq_cst);
666 valid = rec.isValid();