3 #ifndef __CDS_ALGO_FLAT_COMBINING_H
4 #define __CDS_ALGO_FLAT_COMBINING_H
7 #include <cds/cxx11_atomic.h>
8 #include <cds/details/allocator.h>
9 #include <cds/algo/backoff_strategy.h>
10 #include <cds/lock/spinlock.h>
11 #include <cds/opt/options.h>
12 #include <cds/algo/int_algo.h>
13 #include <boost/thread/tss.hpp> // thread_specific_ptr
15 namespace cds { namespace algo {
17 /// @defgroup cds_flat_combining_intrusive Intrusive flat combining containers
18 /// @defgroup cds_flat_combining_container Non-intrusive flat combining containers
22 @anchor cds_flat_combining_description
23 Flat combining (FC) technique is invented by Hendler, Incze, Shavit and Tzafrir in their paper
24 [2010] <i>"Flat Combining and the Synchronization-Parallelism Tradeoff"</i>.
25 The technique converts a sequential data structure to its concurrent implementation.
26 A few structures are added to the sequential implementation: a <i>global lock</i>,
27 a <i>count</i> of the number of combining passes, and a pointer to the <i>head</i>
28 of a <i>publication list</i>. The publication list is a list of thread-local records
29 of a size proportional to the number of threads that are concurrently accessing the shared object.
31 Each thread \p t accessing the structure to perform an invocation of some method \p m
32 on the shared object executes the following sequence of steps:
34 <li>Write the invocation opcode and parameters (if any) of the method \p m to be applied
35 sequentially to the shared object in the <i>request</i> field of your thread local publication
36 record (there is no need to use a load-store memory barrier). The <i>request</i> field will later
37 be used to receive the response. If your thread local publication record is marked as active
38 continue to step 2, otherwise continue to step 5.</li>
39 <li>Check if the global lock is taken. If so (another thread is an active combiner), spin on the <i>request</i>
40 field waiting for a response to the invocation (one can add a yield at this point to allow other threads
41 on the same core to run). Once in a while while spinning check if the lock is still taken and that your
42 record is active. If your record is inactive proceed to step 5. Once the response is available,
43 reset the request field to null and return the response.</li>
44 <li>If the lock is not taken, attempt to acquire it and become a combiner. If you fail,
45 return to spinning in step 2.</li>
46 <li>Otherwise, you hold the lock and are a combiner.
48 <li>Increment the combining pass count by one.</li>
49 <li>Execute a \p fc_apply() by traversing the publication list from the head,
50 combining all nonnull method call invocations, setting the <i>age</i> of each of these records
51 to the current <i>count</i>, applying the combined method calls to the structure D, and returning
52 responses to all the invocations. This traversal is guaranteed to be wait-free.</li>
53 <li>If the <i>count</i> is such that a cleanup needs to be performed, traverse the publication
54 list from the <i>head</i>. Starting from the second item (we always leave the item pointed to
55 by the head in the list), remove from the publication list all records whose <i>age</i> is
56 much smaller than the current <i>count</i>. This is done by removing the node and marking it
58 <li>Release the lock.</li>
60 <li>If you have no thread local publication record allocate one, marked as active. If you already
61 have one marked as inactive, mark it as active. Execute a store-load memory barrier. Proceed to insert
62 the record into the list with a successful CAS to the <i>head</i>. Then proceed to step 1.</li>
65 As the test results show, the flat combining technique is suitable for non-intrusive containers
66 like stack, queue, deque. For intrusive concurrent containers the flat combining demonstrates
67 less impressive results.
69 \ref cds_flat_combining_container "List of FC-based containers" in libcds.
71 \ref cds_flat_combining_intrusive "List of intrusive FC-based containers" in libcds.
73 namespace flat_combining {
75 /// Special values of publication_record::nRequest
78 req_EmptyRecord, ///< Publication record is empty
79 req_Response, ///< Operation is done
81 req_Operation ///< First operation id for derived classes
84 /// publication_record state
86 inactive, ///< Record is inactive
87 active, ///< Record is active
88 removed ///< Record should be removed
91 /// Record of publication list
93 Each data structure based on flat combining contains a class derived from \p %publication_record
95 struct publication_record {
96 atomics::atomic<unsigned int> nRequest; ///< Request field (depends on data structure)
97 atomics::atomic<unsigned int> nState; ///< Record state: inactive, active, removed
98 unsigned int nAge; ///< Age of the record
99 atomics::atomic<publication_record *> pNext; ///< Next record in publication list
100 void * pOwner; ///< [internal data] Pointer to \ref kernel object that manages the publication list
102 /// Initializes publication record
104 : nRequest( req_EmptyRecord )
111 /// Returns the value of \p nRequest field
112 unsigned int op() const
114 return nRequest.load( atomics::memory_order_relaxed );
117 /// Checks if the operation is done
120 return nRequest.load( atomics::memory_order_relaxed ) == req_Response;
124 /// Flat combining internal statistics
125 template <typename Counter = cds::atomicity::event_counter >
128 typedef Counter counter_type; ///< Event counter type
130 counter_type m_nOperationCount ; ///< How many operations have been performed
131 counter_type m_nCombiningCount ; ///< Combining call count
132 counter_type m_nCompactPublicationList; ///< Count of publication list compacting
133 counter_type m_nDeactivatePubRecord; ///< How many publication records were deactivated during compacting
134 counter_type m_nActivatePubRecord; ///< Count of publication record activating
135 counter_type m_nPubRecordCreated ; ///< Count of created publication records
136 counter_type m_nPubRecordDeteted ; ///< Count of deleted publication records
137 counter_type m_nAcquirePubRecCount; ///< Count of acquiring publication record
138 counter_type m_nReleasePubRecCount; ///< Count on releasing publication record
140 /// Returns current combining factor
142 Combining factor is how many operations perform in one combine pass:
143 <tt>combining_factor := m_nOperationCount / m_nCombiningCount</tt>
145 double combining_factor() const
147 return m_nCombiningCount.get() ? double( m_nOperationCount.get()) / m_nCombiningCount.get() : 0.0;
151 void onOperation() { ++m_nOperationCount; }
152 void onCombining() { ++m_nCombiningCount; }
153 void onCompactPublicationList() { ++m_nCompactPublicationList; }
154 void onDeactivatePubRecord() { ++m_nDeactivatePubRecord; }
155 void onActivatPubRecord() { ++m_nActivatePubRecord; }
156 void onCreatePubRecord() { ++m_nPubRecordCreated; }
157 void onDeletePubRecord() { ++m_nPubRecordDeteted; }
158 void onAcquirePubRecord() { ++m_nAcquirePubRecCount; }
159 void onReleasePubRecord() { ++m_nReleasePubRecCount; }
163 /// Flat combining dummy internal statistics
167 void onOperation() {}
168 void onCombining() {}
169 void onCompactPublicationList() {}
170 void onDeactivatePubRecord() {}
171 void onActivatPubRecord() {}
172 void onCreatePubRecord() {}
173 void onDeletePubRecord() {}
174 void onAcquirePubRecord() {}
175 void onReleasePubRecord() {}
179 /// Type traits of \ref kernel class
181 You can define different type traits for \ref kernel
182 by specifying your struct based on \p %type_traits
183 or by using \ref make_traits metafunction.
187 typedef cds::lock::Spin lock_type; ///< Lock type
188 typedef cds::backoff::delay_of<2> back_off; ///< Back-off strategy
189 typedef CDS_DEFAULT_ALLOCATOR allocator; ///< Allocator used for TLS data (allocating publication_record derivatives)
190 typedef empty_stat stat; ///< Internal statistics
191 typedef opt::v::relaxed_ordering memory_model; ///< /// C++ memory ordering model
194 /// Metafunction converting option list to traits
196 This is a wrapper for <tt> cds::opt::make_options< type_traits, Options...> </tt>
198 - \p opt::lock_type - mutex type, default is \p cds::lock::Spin
199 - \p opt::back_off - back-off strategy, defalt is \p cds::backoff::delay_of<2>
200 - \p opt::allocator - allocator type, default is \ref CDS_DEFAULT_ALLOCATOR
201 - \p opt::stat - internal statistics, possible type: \ref stat, \ref empty_stat (the default)
202 - \p opt::memory_model - C++ memory ordering model.
203 List of all available memory ordering see opt::memory_model.
204 Default if cds::opt::v:relaxed_ordering
206 template <CDS_DECL_OPTIONS6>
208 # ifdef CDS_DOXYGEN_INVOKED
209 typedef implementation_defined type ; ///< Metafunction result
211 typedef typename cds::opt::make_options<
212 typename cds::opt::find_type_traits< type_traits, CDS_OPTIONS6 >::type
218 /// The kernel of flat combining
221 - \p PublicationRecord - a type derived from \ref publication_record
222 - \p Traits - a type traits of flat combining, default is flat_combining::type_traits.
223 \ref make_traits metafunction can be used to create type traits
225 The kernel object should be a member of a container class. The container cooperates with flat combining
226 kernel object. There are two ways to interact with the kernel:
227 - One-by-one processing the active records of the publication list. This mode provides \ref combine function:
228 the container acquires its publication record by \ref acquire_record, fills its fields and calls
229 \p combine function of its kernel object. If the current thread becomes a combiner, the kernel
230 calls \p fc_apply function of the container for each active non-empty record. Then, the container
231 should release its publication record by \ref release_record. Only one pass through the publication
233 - Batch processing (\ref batch_combine function). It this mode the container obtains access
234 to entire publication list. This mode allows the container to perform an elimination, for example,
235 the stack can collide \p push and \p pop requests. The sequence of invocations is the following:
236 the container acquires its publication record by \ref acquire_record, fills its field and call
237 \p batch_combine function of its kernel object. If the current thread becomes a combiner,
238 the kernel calls \p fc_process function of the container passing two iterators pointing to
239 begin and end of publication list (see \ref iterator class). The iterators allows
240 multiple pass through active records of publication list. For each processed record the container
241 should call \ref operation_done function. On the end, the container should release
242 its record by \ref release_record.
245 typename PublicationRecord
246 ,typename Traits = type_traits
251 typedef PublicationRecord publication_record_type; ///< publication record type
252 typedef Traits type_traits; ///< Type traits
253 typedef typename type_traits::lock_type global_lock_type; ///< Global lock type
254 typedef typename type_traits::back_off back_off; ///< back-off strategy type
255 typedef typename type_traits::allocator allocator; ///< Allocator type (used for allocating publication_record_type data)
256 typedef typename type_traits::stat stat; ///< Internal statistics
257 typedef typename type_traits::memory_model memory_model; ///< C++ memory model
261 typedef cds::details::Allocator< publication_record_type, allocator > cxx11_allocator; ///< internal helper cds::details::Allocator
262 typedef std::lock_guard<global_lock_type> lock_guard;
266 unsigned int m_nCount; ///< Count of combining passes
267 publication_record_type * m_pHead; ///< Head of publication list
268 boost::thread_specific_ptr< publication_record_type > m_pThreadRec; ///< Thread-local publication record
269 mutable global_lock_type m_Mutex; ///< Global mutex
270 mutable stat m_Stat; ///< Internal statistics
271 unsigned int const m_nCompactFactor; ///< Publication list compacting factor (the list will be compacted through \p %m_nCompactFactor combining passes)
272 unsigned int const m_nCombinePassCount; ///< Number of combining passes
275 /// Initializes the object
279 Combiner pass count = 8
284 , m_pThreadRec( tls_cleanup )
285 , m_nCompactFactor( 64 - 1 ) // binary mask
286 , m_nCombinePassCount( 8 )
291 /// Initializes the object
293 unsigned int nCompactFactor ///< Publication list compacting factor (the list will be compacted through \p nCompactFactor combining passes)
294 ,unsigned int nCombinePassCount ///< Number of combining passes for combiner thread
298 , m_pThreadRec( tls_cleanup )
299 , m_nCompactFactor( (unsigned int)( cds::beans::ceil2( nCompactFactor ) - 1 )) // binary mask
300 , m_nCombinePassCount( nCombinePassCount )
305 /// Destroys the objects and mark all publication records as inactive
308 // mark all publication record as detached
309 for ( publication_record * p = m_pHead; p; p = p->pNext.load( memory_model::memory_order_relaxed ))
313 /// Gets publication list record for the current thread
315 If there is no publication record for the current thread
316 the function allocates it.
318 publication_record_type * acquire_record()
320 publication_record_type * pRec = m_pThreadRec.get();
322 // Allocate new publication record
323 pRec = cxx11_allocator().New();
324 pRec->pOwner = reinterpret_cast<void *>( this );
325 m_pThreadRec.reset( pRec );
326 m_Stat.onCreatePubRecord();
329 if ( pRec->nState.load( memory_model::memory_order_acquire ) != active )
332 assert( pRec->nRequest.load( memory_model::memory_order_relaxed ) == req_EmptyRecord );
334 m_Stat.onAcquirePubRecord();
338 /// Marks publication record for the current thread as empty
339 void release_record( publication_record_type * pRec )
341 assert( pRec->is_done() );
342 pRec->nRequest.store( req_EmptyRecord, memory_model::memory_order_relaxed );
343 m_Stat.onReleasePubRecord();
346 /// Trying to execute operation \p nOpId
348 \p pRec is the publication record acquiring by \ref acquire_record earlier.
349 \p owner is a container that is owner of flat combining kernel object.
350 As a result the current thread can become a combiner or can wait for
351 another combiner performs \p pRec operation.
353 If the thread becomes a combiner, the kernel calls \p owner.fc_apply
354 for each active non-empty publication record.
356 template <class Container>
357 void combine( unsigned int nOpId, publication_record_type * pRec, Container& owner )
359 assert( nOpId >= req_Operation );
361 //assert( pRec->nState.load( memory_model::memory_order_relaxed ) == active );
362 pRec->nRequest.store( nOpId, memory_model::memory_order_release );
364 m_Stat.onOperation();
366 try_combining( owner, pRec );
369 /// Trying to execute operation \p nOpId in batch-combine mode
371 \p pRec is the publication record acquiring by \ref acquire_record earlier.
372 \p owner is a container that owns flat combining kernel object.
373 As a result the current thread can become a combiner or can wait for
374 another combiner performs \p pRec operation.
376 If the thread becomes a combiner, the kernel calls \p owner.fc_process
377 giving the container the full access over publication list. This function
378 is useful for an elimination technique if the container supports any kind of
379 that. The container can perform multiple pass through publication list.
381 \p owner.fc_process has two arguments - forward iterators on begin and end of
382 publication list, see \ref iterator class. For each processed record the container
383 should call \ref operation_done function to mark the record as processed.
385 On the end of \p %batch_combine the \ref combine function is called
386 to process rest of publication records.
388 template <class Container>
389 void batch_combine( unsigned int nOpId, publication_record_type * pRec, Container& owner )
391 assert( nOpId >= req_Operation );
393 //assert( pRec->nState.load( memory_model::memory_order_relaxed ) == active );
394 pRec->nRequest.store( nOpId, memory_model::memory_order_release );
396 m_Stat.onOperation();
398 try_batch_combining( owner, pRec );
401 /// Waits for end of combining
402 void wait_while_combining() const
404 lock_guard l( m_Mutex );
407 /// Marks \p rec as executed
409 This function should be called by container if batch_combine mode is used.
410 For usual combining (see \ref combine) this function is excess.
412 void operation_done( publication_record& rec )
414 rec.nRequest.store( req_Response, memory_model::memory_order_release );
417 /// Internal statistics
418 stat const& statistics() const
424 // For container classes based on flat combining
425 stat& internal_statistics() const
431 /// Returns the compact factor
432 unsigned int compact_factor() const
434 return m_nCompactFactor + 1;
437 /// Returns number of combining passes for combiner thread
438 unsigned int combine_pass_count() const
440 return m_nCombinePassCount;
444 /// Publication list iterator
446 Iterators are intended for batch processing by container's
447 \p fc_process function.
448 The iterator allows iterate through active publication list.
454 publication_record_type * m_pRec;
459 iterator( publication_record_type * pRec )
467 while ( m_pRec && (m_pRec->nState.load( memory_model::memory_order_acquire ) != active
468 || m_pRec->nRequest.load( memory_model::memory_order_relaxed) < req_Operation ))
470 m_pRec = static_cast<publication_record_type *>(m_pRec->pNext.load( memory_model::memory_order_acquire ));
476 /// Initializes an empty iterator object
482 iterator( iterator const& src )
483 : m_pRec( src.m_pRec )
487 iterator& operator++()
490 m_pRec = static_cast<publication_record_type *>( m_pRec->pNext.load( memory_model::memory_order_acquire ));
496 iterator operator++(int)
504 /// Dereference operator, can return \p nullptr
505 publication_record_type * operator ->()
510 /// Dereference operator, the iterator should not be an end iterator
511 publication_record_type& operator*()
517 /// Iterator equality
518 friend bool operator==( iterator it1, iterator it2 )
520 return it1.m_pRec == it2.m_pRec;
523 /// Iterator inequality
524 friend bool operator!=( iterator it1, iterator it2 )
526 return !( it1 == it2 );
530 /// Returns an iterator to the first active publication record
531 iterator begin() { return iterator(m_pHead); }
533 /// Returns an iterator to the end of publication list. Should not be dereferenced.
534 iterator end() { return iterator(); }
538 static void tls_cleanup( publication_record_type * pRec )
541 // pRec that is TLS data should be excluded from publication list
543 if ( pRec->nState.load(memory_model::memory_order_relaxed) == active && pRec->pOwner ) {
544 // record is active and kernel is alive
545 unsigned int nState = active;
546 pRec->nState.compare_exchange_strong( nState, removed, memory_model::memory_order_release, atomics::memory_order_relaxed );
549 // record is not in publication list or kernel already deleted
550 cxx11_allocator().Delete( pRec );
557 assert( m_pThreadRec.get() == nullptr );
558 publication_record_type * pRec = cxx11_allocator().New();
561 m_pThreadRec.reset( pRec );
562 m_Stat.onCreatePubRecord();
565 void publish( publication_record_type * pRec )
567 assert( pRec->nState.load( memory_model::memory_order_relaxed ) == inactive );
569 pRec->nAge = m_nCount;
570 pRec->nState.store( active, memory_model::memory_order_release );
572 // Insert record to publication list
573 if ( m_pHead != static_cast<publication_record *>(pRec) ) {
574 publication_record * p = m_pHead->pNext.load(memory_model::memory_order_relaxed);
575 if ( p != static_cast<publication_record *>( pRec )) {
578 // Failed CAS changes p
579 } while ( !m_pHead->pNext.compare_exchange_weak( p, static_cast<publication_record *>(pRec),
580 memory_model::memory_order_release, atomics::memory_order_relaxed ));
581 m_Stat.onActivatPubRecord();
586 void republish( publication_record_type * pRec )
588 if ( pRec->nState.load( memory_model::memory_order_relaxed ) != active ) {
589 // The record has been excluded from publication list. Reinsert it
594 template <class Container>
595 void try_combining( Container& owner, publication_record_type * pRec )
597 if ( m_Mutex.try_lock() ) {
598 // The thread becomes a combiner
599 lock_guard l( m_Mutex, std::adopt_lock_t() );
601 // The record pRec can be excluded from publication list. Re-publish it
605 assert( pRec->nRequest.load( memory_model::memory_order_relaxed ) == req_Response );
608 // There is another combiner, wait while it executes our request
609 if ( !wait_for_combining( pRec ) ) {
610 // The thread becomes a combiner
611 lock_guard l( m_Mutex, std::adopt_lock_t() );
613 // The record pRec can be excluded from publication list. Re-publish it
617 assert( pRec->nRequest.load( memory_model::memory_order_relaxed ) == req_Response );
622 template <class Container>
623 void try_batch_combining( Container& owner, publication_record_type * pRec )
625 if ( m_Mutex.try_lock() ) {
626 // The thread becomes a combiner
627 lock_guard l( m_Mutex, std::adopt_lock_t() );
629 // The record pRec can be excluded from publication list. Re-publish it
632 batch_combining( owner );
633 assert( pRec->nRequest.load( memory_model::memory_order_relaxed ) == req_Response );
636 // There is another combiner, wait while it executes our request
637 if ( !wait_for_combining( pRec ) ) {
638 // The thread becomes a combiner
639 lock_guard l( m_Mutex, std::adopt_lock_t() );
641 // The record pRec can be excluded from publication list. Re-publish it
644 batch_combining( owner );
645 assert( pRec->nRequest.load( memory_model::memory_order_relaxed ) == req_Response );
650 template <class Container>
651 void combining( Container& owner )
653 // The thread is a combiner
654 assert( !m_Mutex.try_lock() );
656 unsigned int const nCurAge = ++m_nCount;
658 for ( unsigned int nPass = 0; nPass < m_nCombinePassCount; ++nPass )
659 if ( !combining_pass( owner, nCurAge ))
662 m_Stat.onCombining();
663 if ( (nCurAge & m_nCompactFactor) == 0 )
664 compact_list( nCurAge );
667 template <class Container>
668 bool combining_pass( Container& owner, unsigned int nCurAge )
670 publication_record * pPrev = nullptr;
671 publication_record * p = m_pHead;
672 bool bOpDone = false;
674 switch ( p->nState.load( memory_model::memory_order_acquire )) {
676 if ( p->op() >= req_Operation ) {
678 owner.fc_apply( static_cast<publication_record_type *>(p) );
679 operation_done( *p );
684 // Only m_pHead can be inactive in the publication list
685 assert( p == m_pHead );
688 // The record should be removed
689 p = unlink_and_delete_record( pPrev, p );
692 /// ??? That is impossible
696 p = p->pNext.load( memory_model::memory_order_acquire );
701 template <class Container>
702 void batch_combining( Container& owner )
704 // The thread is a combiner
705 assert( !m_Mutex.try_lock() );
707 unsigned int const nCurAge = ++m_nCount;
709 for ( unsigned int nPass = 0; nPass < m_nCombinePassCount; ++nPass )
710 owner.fc_process( begin(), end() );
712 combining_pass( owner, nCurAge );
713 m_Stat.onCombining();
714 if ( (nCurAge & m_nCompactFactor) == 0 )
715 compact_list( nCurAge );
718 bool wait_for_combining( publication_record_type * pRec )
721 while ( pRec->nRequest.load( memory_model::memory_order_acquire ) != req_Response ) {
723 // The record can be excluded from publication list. Reinsert it
728 if ( m_Mutex.try_lock() ) {
729 if ( pRec->nRequest.load( memory_model::memory_order_acquire ) == req_Response ) {
733 // The thread becomes a combiner
740 void compact_list( unsigned int const nCurAge )
742 // Thinning publication list
743 publication_record * pPrev = nullptr;
744 for ( publication_record * p = m_pHead; p; ) {
745 if ( p->nState.load( memory_model::memory_order_acquire ) == active && p->nAge + m_nCompactFactor < nCurAge ) {
747 publication_record * pNext = p->pNext.load( memory_model::memory_order_acquire );
748 if ( pPrev->pNext.compare_exchange_strong( p, pNext,
749 memory_model::memory_order_release, atomics::memory_order_relaxed ))
751 p->nState.store( inactive, memory_model::memory_order_release );
753 m_Stat.onDeactivatePubRecord();
759 p = p->pNext.load( memory_model::memory_order_acquire );
762 m_Stat.onCompactPublicationList();
765 publication_record * unlink_and_delete_record( publication_record * pPrev, publication_record * p )
768 publication_record * pNext = p->pNext.load( memory_model::memory_order_acquire );
769 if ( pPrev->pNext.compare_exchange_strong( p, pNext,
770 memory_model::memory_order_release, atomics::memory_order_relaxed ))
772 cxx11_allocator().Delete( static_cast<publication_record_type *>( p ));
773 m_Stat.onDeletePubRecord();
778 m_pHead = static_cast<publication_record_type *>( p->pNext.load( memory_model::memory_order_acquire ));
779 cxx11_allocator().Delete( static_cast<publication_record_type *>( p ));
780 m_Stat.onDeletePubRecord();
791 template <typename PubRecord>
792 void fc_apply( PubRecord * )
797 template <typename Iterator>
798 void fc_process( Iterator, Iterator )
805 } // namespace flat_combining
806 }} // namespace cds::algo
808 #endif // #ifndef __CDS_ALGO_FLAT_COMBINING_H