3 #ifndef CDSLIB_ALGO_FLAT_COMBINING_H
4 #define CDSLIB_ALGO_FLAT_COMBINING_H
7 #include <cds/algo/atomic.h>
8 #include <cds/details/allocator.h>
9 #include <cds/algo/backoff_strategy.h>
10 #include <cds/lock/spinlock.h>
11 #include <cds/opt/options.h>
12 #include <cds/algo/int_algo.h>
13 #include <boost/thread/tss.hpp> // thread_specific_ptr
15 namespace cds { namespace algo {
17 /// @defgroup cds_flat_combining_intrusive Intrusive flat combining containers
18 /// @defgroup cds_flat_combining_container Non-intrusive flat combining containers
22 @anchor cds_flat_combining_description
23 Flat combining (FC) technique is invented by Hendler, Incze, Shavit and Tzafrir in their paper
24 [2010] <i>"Flat Combining and the Synchronization-Parallelism Tradeoff"</i>.
25 The technique converts a sequential data structure to its concurrent implementation.
26 A few structures are added to the sequential implementation: a <i>global lock</i>,
27 a <i>count</i> of the number of combining passes, and a pointer to the <i>head</i>
28 of a <i>publication list</i>. The publication list is a list of thread-local records
29 of a size proportional to the number of threads that are concurrently accessing the shared object.
31 Each thread \p t accessing the structure to perform an invocation of some method \p m
32 on the shared object executes the following sequence of steps:
34 <li>Write the invocation opcode and parameters (if any) of the method \p m to be applied
35 sequentially to the shared object in the <i>request</i> field of your thread local publication
36 record (there is no need to use a load-store memory barrier). The <i>request</i> field will later
37 be used to receive the response. If your thread local publication record is marked as active
38 continue to step 2, otherwise continue to step 5.</li>
39 <li>Check if the global lock is taken. If so (another thread is an active combiner), spin on the <i>request</i>
40 field waiting for a response to the invocation (one can add a yield at this point to allow other threads
41 on the same core to run). Once in a while while spinning check if the lock is still taken and that your
42 record is active. If your record is inactive proceed to step 5. Once the response is available,
43 reset the request field to null and return the response.</li>
44 <li>If the lock is not taken, attempt to acquire it and become a combiner. If you fail,
45 return to spinning in step 2.</li>
46 <li>Otherwise, you hold the lock and are a combiner.
48 <li>Increment the combining pass count by one.</li>
49 <li>Execute a \p fc_apply() by traversing the publication list from the head,
50 combining all nonnull method call invocations, setting the <i>age</i> of each of these records
51 to the current <i>count</i>, applying the combined method calls to the structure D, and returning
52 responses to all the invocations. This traversal is guaranteed to be wait-free.</li>
53 <li>If the <i>count</i> is such that a cleanup needs to be performed, traverse the publication
54 list from the <i>head</i>. Starting from the second item (we always leave the item pointed to
55 by the head in the list), remove from the publication list all records whose <i>age</i> is
56 much smaller than the current <i>count</i>. This is done by removing the node and marking it
58 <li>Release the lock.</li>
60 <li>If you have no thread local publication record allocate one, marked as active. If you already
61 have one marked as inactive, mark it as active. Execute a store-load memory barrier. Proceed to insert
62 the record into the list with a successful CAS to the <i>head</i>. Then proceed to step 1.</li>
65 As the test results show, the flat combining technique is suitable for non-intrusive containers
66 like stack, queue, deque. For intrusive concurrent containers the flat combining demonstrates
67 less impressive results.
69 \ref cds_flat_combining_container "List of FC-based containers" in libcds.
71 \ref cds_flat_combining_intrusive "List of intrusive FC-based containers" in libcds.
73 namespace flat_combining {
75 /// Special values of publication_record::nRequest
78 req_EmptyRecord, ///< Publication record is empty
79 req_Response, ///< Operation is done
81 req_Operation ///< First operation id for derived classes
84 /// publication_record state
86 inactive, ///< Record is inactive
87 active, ///< Record is active
88 removed ///< Record should be removed
91 /// Record of publication list
93 Each data structure based on flat combining contains a class derived from \p %publication_record
95 struct publication_record {
96 atomics::atomic<unsigned int> nRequest; ///< Request field (depends on data structure)
97 atomics::atomic<unsigned int> nState; ///< Record state: inactive, active, removed
98 unsigned int nAge; ///< Age of the record
99 atomics::atomic<publication_record *> pNext; ///< Next record in publication list
100 void * pOwner; ///< [internal data] Pointer to \ref kernel object that manages the publication list
102 /// Initializes publication record
104 : nRequest( req_EmptyRecord )
111 /// Returns the value of \p nRequest field
112 unsigned int op() const
114 return nRequest.load( atomics::memory_order_relaxed );
117 /// Checks if the operation is done
120 return nRequest.load( atomics::memory_order_relaxed ) == req_Response;
124 /// Flat combining internal statistics
125 template <typename Counter = cds::atomicity::event_counter >
128 typedef Counter counter_type; ///< Event counter type
130 counter_type m_nOperationCount ; ///< How many operations have been performed
131 counter_type m_nCombiningCount ; ///< Combining call count
132 counter_type m_nCompactPublicationList; ///< Count of publication list compacting
133 counter_type m_nDeactivatePubRecord; ///< How many publication records were deactivated during compacting
134 counter_type m_nActivatePubRecord; ///< Count of publication record activating
135 counter_type m_nPubRecordCreated ; ///< Count of created publication records
136 counter_type m_nPubRecordDeteted ; ///< Count of deleted publication records
137 counter_type m_nAcquirePubRecCount; ///< Count of acquiring publication record
138 counter_type m_nReleasePubRecCount; ///< Count on releasing publication record
140 /// Returns current combining factor
142 Combining factor is how many operations perform in one combine pass:
143 <tt>combining_factor := m_nOperationCount / m_nCombiningCount</tt>
145 double combining_factor() const
147 return m_nCombiningCount.get() ? double( m_nOperationCount.get()) / m_nCombiningCount.get() : 0.0;
151 void onOperation() { ++m_nOperationCount; }
152 void onCombining() { ++m_nCombiningCount; }
153 void onCompactPublicationList() { ++m_nCompactPublicationList; }
154 void onDeactivatePubRecord() { ++m_nDeactivatePubRecord; }
155 void onActivatPubRecord() { ++m_nActivatePubRecord; }
156 void onCreatePubRecord() { ++m_nPubRecordCreated; }
157 void onDeletePubRecord() { ++m_nPubRecordDeteted; }
158 void onAcquirePubRecord() { ++m_nAcquirePubRecCount; }
159 void onReleasePubRecord() { ++m_nReleasePubRecCount; }
163 /// Flat combining dummy internal statistics
167 void onOperation() {}
168 void onCombining() {}
169 void onCompactPublicationList() {}
170 void onDeactivatePubRecord() {}
171 void onActivatPubRecord() {}
172 void onCreatePubRecord() {}
173 void onDeletePubRecord() {}
174 void onAcquirePubRecord() {}
175 void onReleasePubRecord() {}
179 /// Type traits of \ref kernel class
181 You can define different type traits for \ref kernel
182 by specifying your struct based on \p %traits
183 or by using \ref make_traits metafunction.
187 typedef cds::lock::Spin lock_type; ///< Lock type
188 typedef cds::backoff::delay_of<2> back_off; ///< Back-off strategy
189 typedef CDS_DEFAULT_ALLOCATOR allocator; ///< Allocator used for TLS data (allocating publication_record derivatives)
190 typedef empty_stat stat; ///< Internal statistics
191 typedef opt::v::relaxed_ordering memory_model; ///< /// C++ memory ordering model
194 /// Metafunction converting option list to traits
197 - \p opt::lock_type - mutex type, default is \p cds::lock::Spin
198 - \p opt::back_off - back-off strategy, defalt is \p cds::backoff::delay_of<2>
199 - \p opt::allocator - allocator type, default is \ref CDS_DEFAULT_ALLOCATOR
200 - \p opt::stat - internal statistics, possible type: \ref stat, \ref empty_stat (the default)
201 - \p opt::memory_model - C++ memory ordering model.
202 List of all available memory ordering see opt::memory_model.
203 Default if cds::opt::v:relaxed_ordering
205 template <typename... Options>
207 # ifdef CDS_DOXYGEN_INVOKED
208 typedef implementation_defined type ; ///< Metafunction result
210 typedef typename cds::opt::make_options<
211 typename cds::opt::find_type_traits< traits, Options... >::type
217 /// The kernel of flat combining
220 - \p PublicationRecord - a type derived from \ref publication_record
221 - \p Traits - a type traits of flat combining, default is \p flat_combining::traits.
222 \ref make_traits metafunction can be used to create type traits
224 The kernel object should be a member of a container class. The container cooperates with flat combining
225 kernel object. There are two ways to interact with the kernel:
226 - One-by-one processing the active records of the publication list. This mode provides \ref combine function:
227 the container acquires its publication record by \ref acquire_record, fills its fields and calls
228 \p combine function of its kernel object. If the current thread becomes a combiner, the kernel
229 calls \p fc_apply function of the container for each active non-empty record. Then, the container
230 should release its publication record by \ref release_record. Only one pass through the publication
232 - Batch processing (\ref batch_combine function). It this mode the container obtains access
233 to entire publication list. This mode allows the container to perform an elimination, for example,
234 the stack can collide \p push and \p pop requests. The sequence of invocations is the following:
235 the container acquires its publication record by \ref acquire_record, fills its field and call
236 \p batch_combine function of its kernel object. If the current thread becomes a combiner,
237 the kernel calls \p fc_process function of the container passing two iterators pointing to
238 begin and end of publication list (see \ref iterator class). The iterators allows
239 multiple pass through active records of publication list. For each processed record the container
240 should call \ref operation_done function. On the end, the container should release
241 its record by \ref release_record.
244 typename PublicationRecord
245 ,typename Traits = traits
250 typedef PublicationRecord publication_record_type; ///< publication record type
251 typedef Traits traits; ///< Type traits
252 typedef typename traits::lock_type global_lock_type; ///< Global lock type
253 typedef typename traits::back_off back_off; ///< back-off strategy type
254 typedef typename traits::allocator allocator; ///< Allocator type (used for allocating publication_record_type data)
255 typedef typename traits::stat stat; ///< Internal statistics
256 typedef typename traits::memory_model memory_model; ///< C++ memory model
260 typedef cds::details::Allocator< publication_record_type, allocator > cxx11_allocator; ///< internal helper cds::details::Allocator
261 typedef std::lock_guard<global_lock_type> lock_guard;
265 unsigned int m_nCount; ///< Count of combining passes
266 publication_record_type * m_pHead; ///< Head of publication list
267 boost::thread_specific_ptr< publication_record_type > m_pThreadRec; ///< Thread-local publication record
268 mutable global_lock_type m_Mutex; ///< Global mutex
269 mutable stat m_Stat; ///< Internal statistics
270 unsigned int const m_nCompactFactor; ///< Publication list compacting factor (the list will be compacted through \p %m_nCompactFactor combining passes)
271 unsigned int const m_nCombinePassCount; ///< Number of combining passes
274 /// Initializes the object
278 Combiner pass count = 8
283 , m_pThreadRec( tls_cleanup )
284 , m_nCompactFactor( 64 - 1 ) // binary mask
285 , m_nCombinePassCount( 8 )
290 /// Initializes the object
292 unsigned int nCompactFactor ///< Publication list compacting factor (the list will be compacted through \p nCompactFactor combining passes)
293 ,unsigned int nCombinePassCount ///< Number of combining passes for combiner thread
297 , m_pThreadRec( tls_cleanup )
298 , m_nCompactFactor( (unsigned int)( cds::beans::ceil2( nCompactFactor ) - 1 )) // binary mask
299 , m_nCombinePassCount( nCombinePassCount )
304 /// Destroys the objects and mark all publication records as inactive
307 // mark all publication record as detached
308 for ( publication_record * p = m_pHead; p; p = p->pNext.load( memory_model::memory_order_relaxed ))
312 /// Gets publication list record for the current thread
314 If there is no publication record for the current thread
315 the function allocates it.
317 publication_record_type * acquire_record()
319 publication_record_type * pRec = m_pThreadRec.get();
321 // Allocate new publication record
322 pRec = cxx11_allocator().New();
323 pRec->pOwner = reinterpret_cast<void *>( this );
324 m_pThreadRec.reset( pRec );
325 m_Stat.onCreatePubRecord();
328 if ( pRec->nState.load( memory_model::memory_order_acquire ) != active )
331 assert( pRec->nRequest.load( memory_model::memory_order_relaxed ) == req_EmptyRecord );
333 m_Stat.onAcquirePubRecord();
337 /// Marks publication record for the current thread as empty
338 void release_record( publication_record_type * pRec )
340 assert( pRec->is_done() );
341 pRec->nRequest.store( req_EmptyRecord, memory_model::memory_order_relaxed );
342 m_Stat.onReleasePubRecord();
345 /// Trying to execute operation \p nOpId
347 \p pRec is the publication record acquiring by \ref acquire_record earlier.
348 \p owner is a container that is owner of flat combining kernel object.
349 As a result the current thread can become a combiner or can wait for
350 another combiner performs \p pRec operation.
352 If the thread becomes a combiner, the kernel calls \p owner.fc_apply
353 for each active non-empty publication record.
355 template <class Container>
356 void combine( unsigned int nOpId, publication_record_type * pRec, Container& owner )
358 assert( nOpId >= req_Operation );
360 //assert( pRec->nState.load( memory_model::memory_order_relaxed ) == active );
361 pRec->nRequest.store( nOpId, memory_model::memory_order_release );
363 m_Stat.onOperation();
365 try_combining( owner, pRec );
368 /// Trying to execute operation \p nOpId in batch-combine mode
370 \p pRec is the publication record acquiring by \ref acquire_record earlier.
371 \p owner is a container that owns flat combining kernel object.
372 As a result the current thread can become a combiner or can wait for
373 another combiner performs \p pRec operation.
375 If the thread becomes a combiner, the kernel calls \p owner.fc_process
376 giving the container the full access over publication list. This function
377 is useful for an elimination technique if the container supports any kind of
378 that. The container can perform multiple pass through publication list.
380 \p owner.fc_process has two arguments - forward iterators on begin and end of
381 publication list, see \ref iterator class. For each processed record the container
382 should call \ref operation_done function to mark the record as processed.
384 On the end of \p %batch_combine the \ref combine function is called
385 to process rest of publication records.
387 template <class Container>
388 void batch_combine( unsigned int nOpId, publication_record_type * pRec, Container& owner )
390 assert( nOpId >= req_Operation );
392 //assert( pRec->nState.load( memory_model::memory_order_relaxed ) == active );
393 pRec->nRequest.store( nOpId, memory_model::memory_order_release );
395 m_Stat.onOperation();
397 try_batch_combining( owner, pRec );
400 /// Waits for end of combining
401 void wait_while_combining() const
403 lock_guard l( m_Mutex );
406 /// Marks \p rec as executed
408 This function should be called by container if batch_combine mode is used.
409 For usual combining (see \ref combine) this function is excess.
411 void operation_done( publication_record& rec )
413 rec.nRequest.store( req_Response, memory_model::memory_order_release );
416 /// Internal statistics
417 stat const& statistics() const
423 // For container classes based on flat combining
424 stat& internal_statistics() const
430 /// Returns the compact factor
431 unsigned int compact_factor() const
433 return m_nCompactFactor + 1;
436 /// Returns number of combining passes for combiner thread
437 unsigned int combine_pass_count() const
439 return m_nCombinePassCount;
443 /// Publication list iterator
445 Iterators are intended for batch processing by container's
446 \p fc_process function.
447 The iterator allows iterate through active publication list.
453 publication_record_type * m_pRec;
458 iterator( publication_record_type * pRec )
466 while ( m_pRec && (m_pRec->nState.load( memory_model::memory_order_acquire ) != active
467 || m_pRec->nRequest.load( memory_model::memory_order_relaxed) < req_Operation ))
469 m_pRec = static_cast<publication_record_type *>(m_pRec->pNext.load( memory_model::memory_order_acquire ));
475 /// Initializes an empty iterator object
481 iterator( iterator const& src )
482 : m_pRec( src.m_pRec )
486 iterator& operator++()
489 m_pRec = static_cast<publication_record_type *>( m_pRec->pNext.load( memory_model::memory_order_acquire ));
495 iterator operator++(int)
503 /// Dereference operator, can return \p nullptr
504 publication_record_type * operator ->()
509 /// Dereference operator, the iterator should not be an end iterator
510 publication_record_type& operator*()
516 /// Iterator equality
517 friend bool operator==( iterator it1, iterator it2 )
519 return it1.m_pRec == it2.m_pRec;
522 /// Iterator inequality
523 friend bool operator!=( iterator it1, iterator it2 )
525 return !( it1 == it2 );
529 /// Returns an iterator to the first active publication record
530 iterator begin() { return iterator(m_pHead); }
532 /// Returns an iterator to the end of publication list. Should not be dereferenced.
533 iterator end() { return iterator(); }
537 static void tls_cleanup( publication_record_type * pRec )
540 // pRec that is TLS data should be excluded from publication list
542 if ( pRec->nState.load(memory_model::memory_order_relaxed) == active && pRec->pOwner ) {
543 // record is active and kernel is alive
544 unsigned int nState = active;
545 pRec->nState.compare_exchange_strong( nState, removed, memory_model::memory_order_release, atomics::memory_order_relaxed );
548 // record is not in publication list or kernel already deleted
549 cxx11_allocator().Delete( pRec );
556 assert( m_pThreadRec.get() == nullptr );
557 publication_record_type * pRec = cxx11_allocator().New();
560 m_pThreadRec.reset( pRec );
561 m_Stat.onCreatePubRecord();
564 void publish( publication_record_type * pRec )
566 assert( pRec->nState.load( memory_model::memory_order_relaxed ) == inactive );
568 pRec->nAge = m_nCount;
569 pRec->nState.store( active, memory_model::memory_order_release );
571 // Insert record to publication list
572 if ( m_pHead != static_cast<publication_record *>(pRec) ) {
573 publication_record * p = m_pHead->pNext.load(memory_model::memory_order_relaxed);
574 if ( p != static_cast<publication_record *>( pRec )) {
577 // Failed CAS changes p
578 } while ( !m_pHead->pNext.compare_exchange_weak( p, static_cast<publication_record *>(pRec),
579 memory_model::memory_order_release, atomics::memory_order_relaxed ));
580 m_Stat.onActivatPubRecord();
585 void republish( publication_record_type * pRec )
587 if ( pRec->nState.load( memory_model::memory_order_relaxed ) != active ) {
588 // The record has been excluded from publication list. Reinsert it
593 template <class Container>
594 void try_combining( Container& owner, publication_record_type * pRec )
596 if ( m_Mutex.try_lock() ) {
597 // The thread becomes a combiner
598 lock_guard l( m_Mutex, std::adopt_lock_t() );
600 // The record pRec can be excluded from publication list. Re-publish it
604 assert( pRec->nRequest.load( memory_model::memory_order_relaxed ) == req_Response );
607 // There is another combiner, wait while it executes our request
608 if ( !wait_for_combining( pRec ) ) {
609 // The thread becomes a combiner
610 lock_guard l( m_Mutex, std::adopt_lock_t() );
612 // The record pRec can be excluded from publication list. Re-publish it
616 assert( pRec->nRequest.load( memory_model::memory_order_relaxed ) == req_Response );
621 template <class Container>
622 void try_batch_combining( Container& owner, publication_record_type * pRec )
624 if ( m_Mutex.try_lock() ) {
625 // The thread becomes a combiner
626 lock_guard l( m_Mutex, std::adopt_lock_t() );
628 // The record pRec can be excluded from publication list. Re-publish it
631 batch_combining( owner );
632 assert( pRec->nRequest.load( memory_model::memory_order_relaxed ) == req_Response );
635 // There is another combiner, wait while it executes our request
636 if ( !wait_for_combining( pRec ) ) {
637 // The thread becomes a combiner
638 lock_guard l( m_Mutex, std::adopt_lock_t() );
640 // The record pRec can be excluded from publication list. Re-publish it
643 batch_combining( owner );
644 assert( pRec->nRequest.load( memory_model::memory_order_relaxed ) == req_Response );
649 template <class Container>
650 void combining( Container& owner )
652 // The thread is a combiner
653 assert( !m_Mutex.try_lock() );
655 unsigned int const nCurAge = ++m_nCount;
657 for ( unsigned int nPass = 0; nPass < m_nCombinePassCount; ++nPass )
658 if ( !combining_pass( owner, nCurAge ))
661 m_Stat.onCombining();
662 if ( (nCurAge & m_nCompactFactor) == 0 )
663 compact_list( nCurAge );
666 template <class Container>
667 bool combining_pass( Container& owner, unsigned int nCurAge )
669 publication_record * pPrev = nullptr;
670 publication_record * p = m_pHead;
671 bool bOpDone = false;
673 switch ( p->nState.load( memory_model::memory_order_acquire )) {
675 if ( p->op() >= req_Operation ) {
677 owner.fc_apply( static_cast<publication_record_type *>(p) );
678 operation_done( *p );
683 // Only m_pHead can be inactive in the publication list
684 assert( p == m_pHead );
687 // The record should be removed
688 p = unlink_and_delete_record( pPrev, p );
691 /// ??? That is impossible
695 p = p->pNext.load( memory_model::memory_order_acquire );
700 template <class Container>
701 void batch_combining( Container& owner )
703 // The thread is a combiner
704 assert( !m_Mutex.try_lock() );
706 unsigned int const nCurAge = ++m_nCount;
708 for ( unsigned int nPass = 0; nPass < m_nCombinePassCount; ++nPass )
709 owner.fc_process( begin(), end() );
711 combining_pass( owner, nCurAge );
712 m_Stat.onCombining();
713 if ( (nCurAge & m_nCompactFactor) == 0 )
714 compact_list( nCurAge );
717 bool wait_for_combining( publication_record_type * pRec )
720 while ( pRec->nRequest.load( memory_model::memory_order_acquire ) != req_Response ) {
722 // The record can be excluded from publication list. Reinsert it
727 if ( m_Mutex.try_lock() ) {
728 if ( pRec->nRequest.load( memory_model::memory_order_acquire ) == req_Response ) {
732 // The thread becomes a combiner
739 void compact_list( unsigned int const nCurAge )
741 // Thinning publication list
742 publication_record * pPrev = nullptr;
743 for ( publication_record * p = m_pHead; p; ) {
744 if ( p->nState.load( memory_model::memory_order_acquire ) == active && p->nAge + m_nCompactFactor < nCurAge ) {
746 publication_record * pNext = p->pNext.load( memory_model::memory_order_acquire );
747 if ( pPrev->pNext.compare_exchange_strong( p, pNext,
748 memory_model::memory_order_release, atomics::memory_order_relaxed ))
750 p->nState.store( inactive, memory_model::memory_order_release );
752 m_Stat.onDeactivatePubRecord();
758 p = p->pNext.load( memory_model::memory_order_acquire );
761 m_Stat.onCompactPublicationList();
764 publication_record * unlink_and_delete_record( publication_record * pPrev, publication_record * p )
767 publication_record * pNext = p->pNext.load( memory_model::memory_order_acquire );
768 if ( pPrev->pNext.compare_exchange_strong( p, pNext,
769 memory_model::memory_order_release, atomics::memory_order_relaxed ))
771 cxx11_allocator().Delete( static_cast<publication_record_type *>( p ));
772 m_Stat.onDeletePubRecord();
777 m_pHead = static_cast<publication_record_type *>( p->pNext.load( memory_model::memory_order_acquire ));
778 cxx11_allocator().Delete( static_cast<publication_record_type *>( p ));
779 m_Stat.onDeletePubRecord();
790 template <typename PubRecord>
791 void fc_apply( PubRecord * )
796 template <typename Iterator>
797 void fc_process( Iterator, Iterator )
804 } // namespace flat_combining
805 }} // namespace cds::algo
807 #endif // #ifndef CDSLIB_ALGO_FLAT_COMBINING_H