2 This file is a part of libcds - Concurrent Data Structures library
4 (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2017
6 Source code repo: http://github.com/khizmax/libcds/
7 Download: http://sourceforge.net/projects/libcds/files/
9 Redistribution and use in source and binary forms, with or without
10 modification, are permitted provided that the following conditions are met:
12 * Redistributions of source code must retain the above copyright notice, this
13 list of conditions and the following disclaimer.
15 * Redistributions in binary form must reproduce the above copyright notice,
16 this list of conditions and the following disclaimer in the documentation
17 and/or other materials provided with the distribution.
19 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22 DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23 FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24 DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25 SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27 OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28 OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 #ifndef CDSLIB_ALGO_FLAT_COMBINING_KERNEL_H
32 #define CDSLIB_ALGO_FLAT_COMBINING_KERNEL_H
34 #include <cds/algo/flat_combining/defs.h>
35 #include <cds/algo/flat_combining/wait_strategy.h>
37 #include <cds/sync/spinlock.h>
38 #include <cds/details/allocator.h>
39 #include <cds/opt/options.h>
40 #include <cds/algo/int_algo.h>
42 namespace cds { namespace algo {
44 /// @defgroup cds_flat_combining_intrusive Intrusive flat combining containers
45 /// @defgroup cds_flat_combining_container Non-intrusive flat combining containers
49 @anchor cds_flat_combining_description
50 Flat combining (FC) technique is invented by Hendler, Incze, Shavit and Tzafrir in their paper
51 [2010] <i>"Flat Combining and the Synchronization-Parallelism Tradeoff"</i>.
52 The technique converts a sequential data structure to its concurrent implementation.
53 A few structures are added to the sequential implementation: a <i>global lock</i>,
54 a <i>count</i> of the number of combining passes, and a pointer to the <i>head</i>
55 of a <i>publication list</i>. The publication list is a list of thread-local records
56 of a size proportional to the number of threads that are concurrently accessing the shared object.
58 Each thread \p t accessing the structure to perform an invocation of some method \p f()
59 on the shared object executes the following sequence of steps:
61 <li>Write the invocation opcode and parameters (if any) of the method \p f() to be applied
62 sequentially to the shared object in the <i>request</i> field of your thread local publication
63 record (there is no need to use a load-store memory barrier). The <i>request</i> field will later
64 be used to receive the response. If your thread local publication record is marked as active
65 continue to step 2, otherwise continue to step 5.</li>
66 <li>Check if the global lock is taken. If so (another thread is an active combiner), spin on the <i>request</i>
67 field waiting for a response to the invocation (one can add a yield at this point to allow other threads
68 on the same core to run). Once in a while while spinning check if the lock is still taken and that your
69 record is active (you may use any of \p wait_strategy instead of spinning). If your record is inactive proceed to step 5.
70 Once the response is available, reset the request field to null and return the response.</li>
71 <li>If the lock is not taken, attempt to acquire it and become a combiner. If you fail,
72 return to spinning in step 2.</li>
73 <li>Otherwise, you hold the lock and are a combiner.
75 <li>Increment the combining pass count by one.</li>
76 <li>Execute a \p fc_apply() by traversing the publication list from the head,
77 combining all non-null method call invocations, setting the <i>age</i> of each of these records
78 to the current <i>count</i>, applying the combined method calls to the structure D, and returning
79 responses to all the invocations. This traversal is guaranteed to be wait-free.</li>
80 <li>If the <i>count</i> is such that a cleanup needs to be performed, traverse the publication
81 list from the <i>head</i>. Starting from the second item (we always leave the item pointed to
82 by the head in the list), remove from the publication list all records whose <i>age</i> is
83 much smaller than the current <i>count</i>. This is done by removing the node and marking it
85 <li>Release the lock.</li>
87 <li>If you have no thread local publication record allocate one, marked as active. If you already
88 have one marked as inactive, mark it as active. Execute a store-load memory barrier. Proceed to insert
89 the record into the list with a successful CAS to the <i>head</i>. Then proceed to step 1.</li>
92 As the test results show, the flat combining technique is suitable for non-intrusive containers
93 like stack, queue, deque. For intrusive concurrent containers the flat combining demonstrates
94 less impressive results.
96 \ref cds_flat_combining_container "List of FC-based containers" in libcds.
98 \ref cds_flat_combining_intrusive "List of intrusive FC-based containers" in libcds.
100 namespace flat_combining {
102 /// Flat combining internal statistics
103 template <typename Counter = cds::atomicity::event_counter >
106 typedef Counter counter_type; ///< Event counter type
108 counter_type m_nOperationCount ; ///< How many operations have been performed
109 counter_type m_nCombiningCount ; ///< Combining call count
110 counter_type m_nCompactPublicationList; ///< Count of publication list compacting
111 counter_type m_nDeactivatePubRecord; ///< How many publication records were deactivated during compacting
112 counter_type m_nActivatePubRecord; ///< Count of publication record activating
113 counter_type m_nPubRecordCreated ; ///< Count of created publication records
114 counter_type m_nPubRecordDeleted ; ///< Count of deleted publication records
115 counter_type m_nPassiveWaitCall; ///< Count of passive waiting call (\p kernel::wait_for_combining())
116 counter_type m_nPassiveWaitIteration;///< Count of iteration inside passive waiting
117 counter_type m_nPassiveWaitWakeup; ///< Count of forcing wake-up of passive wait cycle
118 counter_type m_nInvokeExclusive; ///< Count of call \p kernel::invoke_exclusive()
119 counter_type m_nWakeupByNotifying; ///< How many times the passive thread be waked up by a notification
120 counter_type m_nPassiveToCombiner; ///< How many times the passive thread becomes the combiner
122 /// Returns current combining factor
124 Combining factor is how many operations perform in one combine pass:
125 <tt>combining_factor := m_nOperationCount / m_nCombiningCount</tt>
127 double combining_factor() const
129 return m_nCombiningCount.get() ? double( m_nOperationCount.get()) / m_nCombiningCount.get() : 0.0;
133 void onOperation() { ++m_nOperationCount; }
134 void onCombining() { ++m_nCombiningCount; }
135 void onCompactPublicationList() { ++m_nCompactPublicationList; }
136 void onDeactivatePubRecord() { ++m_nDeactivatePubRecord; }
137 void onActivatePubRecord() { ++m_nActivatePubRecord; }
138 void onCreatePubRecord() { ++m_nPubRecordCreated; }
139 void onDeletePubRecord() { ++m_nPubRecordDeleted; }
140 void onPassiveWait() { ++m_nPassiveWaitCall; }
141 void onPassiveWaitIteration() { ++m_nPassiveWaitIteration; }
142 void onPassiveWaitWakeup() { ++m_nPassiveWaitWakeup; }
143 void onInvokeExclusive() { ++m_nInvokeExclusive; }
144 void onWakeupByNotifying() { ++m_nWakeupByNotifying; }
145 void onPassiveToCombiner() { ++m_nPassiveToCombiner; }
150 /// Flat combining dummy internal statistics
154 void onOperation() const {}
155 void onCombining() const {}
156 void onCompactPublicationList() const {}
157 void onDeactivatePubRecord() const {}
158 void onActivatePubRecord() const {}
159 void onCreatePubRecord() const {}
160 void onDeletePubRecord() const {}
161 void onPassiveWait() const {}
162 void onPassiveWaitIteration() const {}
163 void onPassiveWaitWakeup() const {}
164 void onInvokeExclusive() const {}
165 void onWakeupByNotifying() const {}
166 void onPassiveToCombiner() const {}
170 /// Type traits of \ref kernel class
172 You can define different type traits for \ref kernel
173 by specifying your struct based on \p %traits
174 or by using \ref make_traits metafunction.
178 typedef cds::sync::spin lock_type; ///< Lock type
179 typedef cds::algo::flat_combining::wait_strategy::backoff< cds::backoff::delay_of<2>> wait_strategy; ///< Wait strategy
180 typedef CDS_DEFAULT_ALLOCATOR allocator; ///< Allocator used for TLS data (allocating \p publication_record derivatives)
181 typedef empty_stat stat; ///< Internal statistics
182 typedef opt::v::relaxed_ordering memory_model; ///< /// C++ memory ordering model
185 /// Metafunction converting option list to traits
188 - \p opt::lock_type - mutex type, default is \p cds::sync::spin
189 - \p opt::wait_strategy - wait strategy, see \p wait_strategy namespace, default is \p wait_strategy::backoff.
190 - \p opt::allocator - allocator type, default is \ref CDS_DEFAULT_ALLOCATOR
191 - \p opt::stat - internal statistics, possible type: \ref stat, \ref empty_stat (the default)
192 - \p opt::memory_model - C++ memory ordering model.
193 List of all available memory ordering see \p opt::memory_model.
194 Default is \p cds::opt::v::relaxed_ordering
196 template <typename... Options>
198 # ifdef CDS_DOXYGEN_INVOKED
199 typedef implementation_defined type ; ///< Metafunction result
201 typedef typename cds::opt::make_options<
202 typename cds::opt::find_type_traits< traits, Options... >::type
208 /// The kernel of flat combining
211 - \p PublicationRecord - a type derived from \ref publication_record
212 - \p Traits - a type traits of flat combining, default is \p flat_combining::traits.
213 \ref make_traits metafunction can be used to create type traits
215 The kernel object should be a member of a container class. The container cooperates with flat combining
216 kernel object. There are two ways to interact with the kernel:
217 - One-by-one processing the active records of the publication list. This mode provides by \p combine() function:
218 the container acquires its publication record by \p acquire_record(), fills its fields and calls
219 \p combine() function of its kernel object. If the current thread becomes a combiner, the kernel
220 calls \p fc_apply() function of the container for each active non-empty record. Then, the container
221 should release its publication record by \p release_record(). Only one pass through the publication
223 - Batch processing - \p batch_combine() function. It this mode the container obtains access
224 to entire publication list. This mode allows the container to perform an elimination, for example,
225 the stack can collide \p push() and \p pop() requests. The sequence of invocations is the following:
226 the container acquires its publication record by \p acquire_record(), fills its field and call
227 \p batch_combine() function of its kernel object. If the current thread becomes a combiner,
228 the kernel calls \p fc_process() function of the container passing two iterators pointing to
229 the begin and the end of publication list (see \ref iterator class). The iterators allow
230 multiple pass through active records of publication list. For each processed record the container
231 should call \p operation_done() function. On the end, the container should release
232 its record by \p release_record().
235 typename PublicationRecord
236 ,typename Traits = traits
241 typedef Traits traits; ///< Type traits
242 typedef typename traits::lock_type global_lock_type; ///< Global lock type
243 typedef typename traits::wait_strategy wait_strategy; ///< Wait strategy type
244 typedef typename traits::allocator allocator; ///< Allocator type (used for allocating publication_record_type data)
245 typedef typename traits::stat stat; ///< Internal statistics
246 typedef typename traits::memory_model memory_model; ///< C++ memory model
248 typedef typename wait_strategy::template make_publication_record<PublicationRecord>::type publication_record_type; ///< Publication record type
252 typedef cds::details::Allocator< publication_record_type, allocator > cxx11_allocator; ///< internal helper cds::details::Allocator
253 typedef std::lock_guard<global_lock_type> lock_guard;
257 atomics::atomic<unsigned int> m_nCount; ///< Total count of combining passes. Used as an age.
258 publication_record_type* m_pHead; ///< Head of active publication list
259 publication_record_type* m_pAllocatedHead; ///< Head of allocated publication list
260 boost::thread_specific_ptr< publication_record_type > m_pThreadRec; ///< Thread-local publication record
261 mutable global_lock_type m_Mutex; ///< Global mutex
262 mutable stat m_Stat; ///< Internal statistics
263 unsigned int const m_nCompactFactor; ///< Publication list compacting factor (the list will be compacted through \p %m_nCompactFactor combining passes)
264 unsigned int const m_nCombinePassCount; ///< Number of combining passes
265 wait_strategy m_waitStrategy; ///< Wait strategy
268 /// Initializes the object
270 Compact factor = 1024
272 Combiner pass count = 8
278 /// Initializes the object
280 unsigned int nCompactFactor ///< Publication list compacting factor (the list will be compacted through \p nCompactFactor combining passes)
281 ,unsigned int nCombinePassCount ///< Number of combining passes for combiner thread
285 , m_pAllocatedHead( nullptr )
286 , m_pThreadRec( tls_cleanup )
287 , m_nCompactFactor( (unsigned int)( cds::beans::ceil2( nCompactFactor ) - 1 )) // binary mask
288 , m_nCombinePassCount( nCombinePassCount )
290 assert( m_pThreadRec.get() == nullptr );
291 publication_record_type* pRec = cxx11_allocator().New();
294 m_pThreadRec.reset( pRec );
295 m_Stat.onCreatePubRecord();
298 /// Destroys the object and all publication records
301 m_pThreadRec.reset(); // calls tls_cleanup()
303 // delete all publication records
304 for ( publication_record* p = m_pAllocatedHead; p; ) {
305 publication_record * pRec = p;
306 p = p->pNextAllocated.load( memory_model::memory_order_relaxed );
307 free_publication_record( static_cast<publication_record_type *>( pRec ));
311 /// Gets publication list record for the current thread
313 If there is no publication record for the current thread
314 the function allocates it.
316 publication_record_type * acquire_record()
318 publication_record_type * pRec = m_pThreadRec.get();
320 // Allocate new publication record
321 pRec = cxx11_allocator().New();
322 m_pThreadRec.reset( pRec );
323 m_Stat.onCreatePubRecord();
325 // Insert in allocated list
326 assert( m_pAllocatedHead != nullptr );
327 publication_record* p = m_pAllocatedHead->pNextAllocated.load( memory_model::memory_order_acquire );
329 pRec->pNextAllocated.store( p, memory_model::memory_order_relaxed );
330 } while ( !m_pAllocatedHead->pNextAllocated.compare_exchange_weak( p, pRec, memory_model::memory_order_release, atomics::memory_order_acquire ));
334 else if ( pRec->nState.load( memory_model::memory_order_acquire ) != active )
337 assert( pRec->op() == req_EmptyRecord );
342 /// Marks publication record for the current thread as empty
343 void release_record( publication_record_type * pRec )
345 assert( pRec->is_done());
346 pRec->nRequest.store( req_EmptyRecord, memory_model::memory_order_release );
349 /// Trying to execute operation \p nOpId
351 \p pRec is the publication record acquiring by \ref acquire_record earlier.
352 \p owner is a container that is owner of flat combining kernel object.
353 As a result the current thread can become a combiner or can wait for
354 another combiner performs \p pRec operation.
356 If the thread becomes a combiner, the kernel calls \p owner.fc_apply
357 for each active non-empty publication record.
359 template <class Container>
360 void combine( unsigned int nOpId, publication_record_type * pRec, Container& owner )
362 assert( nOpId >= req_Operation );
365 pRec->nRequest.store( nOpId, memory_model::memory_order_release );
366 m_Stat.onOperation();
368 try_combining( owner, pRec );
371 /// Trying to execute operation \p nOpId in batch-combine mode
373 \p pRec is the publication record acquiring by \p acquire_record() earlier.
374 \p owner is a container that owns flat combining kernel object.
375 As a result the current thread can become a combiner or can wait for
376 another combiner performs \p pRec operation.
378 If the thread becomes a combiner, the kernel calls \p owner.fc_process()
379 giving the container the full access over publication list. This function
380 is useful for an elimination technique if the container supports any kind of
381 that. The container can perform multiple pass through publication list.
383 \p owner.fc_process() has two arguments - forward iterators on begin and end of
384 publication list, see \ref iterator class. For each processed record the container
385 should call \p operation_done() function to mark the record as processed.
387 On the end of \p %batch_combine the \p combine() function is called
388 to process rest of publication records.
390 template <class Container>
391 void batch_combine( unsigned int nOpId, publication_record_type* pRec, Container& owner )
393 assert( nOpId >= req_Operation );
396 pRec->nRequest.store( nOpId, memory_model::memory_order_release );
397 m_Stat.onOperation();
399 try_batch_combining( owner, pRec );
402 /// Invokes \p Func in exclusive mode
404 Some operation in flat combining containers should be called in exclusive mode
405 i.e the current thread should become the combiner to process the operation.
406 The typical example is \p empty() function.
408 \p %invoke_exclusive() allows do that: the current thread becomes the combiner,
409 invokes \p f exclusively but unlike a typical usage the thread does not process any pending request.
410 Instead, after end of \p f call the current thread wakes up a pending thread if any.
412 template <typename Func>
413 void invoke_exclusive( Func f )
416 lock_guard l( m_Mutex );
419 m_waitStrategy.wakeup( *this );
420 m_Stat.onInvokeExclusive();
423 /// Marks \p rec as executed
425 This function should be called by container if \p batch_combine() mode is used.
426 For usual combining (see \p combine()) this function is excess.
428 void operation_done( publication_record& rec )
430 rec.nRequest.store( req_Response, memory_model::memory_order_release );
431 m_waitStrategy.notify( *this, static_cast<publication_record_type&>( rec ));
434 /// Internal statistics
435 stat const& statistics() const
441 // For container classes based on flat combining
442 stat& internal_statistics() const
448 /// Returns the compact factor
449 unsigned int compact_factor() const
451 return m_nCompactFactor + 1;
454 /// Returns number of combining passes for combiner thread
455 unsigned int combine_pass_count() const
457 return m_nCombinePassCount;
461 /// Publication list iterator
463 Iterators are intended for batch processing by container's
464 \p fc_process function.
465 The iterator allows iterate through active publication list.
471 publication_record_type * m_pRec;
476 iterator( publication_record_type * pRec )
484 while ( m_pRec && (m_pRec->nState.load( memory_model::memory_order_acquire ) != active
485 || m_pRec->op( memory_model::memory_order_relaxed) < req_Operation ))
487 m_pRec = static_cast<publication_record_type*>(m_pRec->pNext.load( memory_model::memory_order_acquire ));
493 /// Initializes an empty iterator object
499 iterator( iterator const& src )
500 : m_pRec( src.m_pRec )
504 iterator& operator++()
507 m_pRec = static_cast<publication_record_type *>( m_pRec->pNext.load( memory_model::memory_order_acquire ));
513 iterator operator++(int)
521 /// Dereference operator, can return \p nullptr
522 publication_record_type* operator ->()
527 /// Dereference operator, the iterator should not be an end iterator
528 publication_record_type& operator*()
534 /// Iterator equality
535 friend bool operator==( iterator it1, iterator it2 )
537 return it1.m_pRec == it2.m_pRec;
540 /// Iterator inequality
541 friend bool operator!=( iterator it1, iterator it2 )
543 return !( it1 == it2 );
547 /// Returns an iterator to the first active publication record
548 iterator begin() { return iterator(m_pHead); }
550 /// Returns an iterator to the end of publication list. Should not be dereferenced.
551 iterator end() { return iterator(); }
554 /// Gets current value of \p rec.nRequest
556 This function is intended for invoking from a wait strategy
558 int get_operation( publication_record& rec )
560 return rec.op( memory_model::memory_order_acquire );
563 /// Wakes up any waiting thread
565 This function is intended for invoking from a wait strategy
569 publication_record* pRec = m_pHead;
571 if ( pRec->nState.load( memory_model::memory_order_acquire ) == active
572 && pRec->op( memory_model::memory_order_acquire ) >= req_Operation )
574 m_waitStrategy.notify( *this, static_cast<publication_record_type&>( *pRec ));
577 pRec = pRec->pNext.load( memory_model::memory_order_acquire );
583 static void tls_cleanup( publication_record_type* pRec )
586 // pRec that is TLS data should be excluded from publication list
587 pRec->nState.store( removed, memory_model::memory_order_release );
590 void free_publication_record( publication_record_type* pRec )
592 cxx11_allocator().Delete( pRec );
593 m_Stat.onDeletePubRecord();
596 void publish( publication_record_type* pRec )
598 assert( pRec->nState.load( memory_model::memory_order_relaxed ) == inactive );
600 pRec->nAge.store( m_nCount.load(memory_model::memory_order_relaxed), memory_model::memory_order_relaxed );
601 pRec->nState.store( active, memory_model::memory_order_relaxed );
603 // Insert record to publication list
604 if ( m_pHead != static_cast<publication_record *>(pRec)) {
605 publication_record * p = m_pHead->pNext.load(memory_model::memory_order_relaxed);
606 if ( p != static_cast<publication_record *>( pRec )) {
608 pRec->pNext.store( p, memory_model::memory_order_relaxed );
609 // Failed CAS changes p
610 } while ( !m_pHead->pNext.compare_exchange_weak( p, static_cast<publication_record *>(pRec),
611 memory_model::memory_order_release, atomics::memory_order_acquire ));
612 m_Stat.onActivatePubRecord();
617 void republish( publication_record_type* pRec )
619 if ( pRec->nState.load( memory_model::memory_order_relaxed ) != active ) {
620 // The record has been excluded from publication list. Reinsert it
625 template <class Container>
626 void try_combining( Container& owner, publication_record_type* pRec )
628 if ( m_Mutex.try_lock()) {
629 // The thread becomes a combiner
630 lock_guard l( m_Mutex, std::adopt_lock_t());
632 // The record pRec can be excluded from publication list. Re-publish it
636 assert( pRec->op( memory_model::memory_order_relaxed ) == req_Response );
639 // There is another combiner, wait while it executes our request
640 if ( !wait_for_combining( pRec )) {
641 // The thread becomes a combiner
642 lock_guard l( m_Mutex, std::adopt_lock_t());
644 // The record pRec can be excluded from publication list. Re-publish it
648 assert( pRec->op( memory_model::memory_order_relaxed ) == req_Response );
653 template <class Container>
654 void try_batch_combining( Container& owner, publication_record_type * pRec )
656 if ( m_Mutex.try_lock()) {
657 // The thread becomes a combiner
658 lock_guard l( m_Mutex, std::adopt_lock_t());
660 // The record pRec can be excluded from publication list. Re-publish it
663 batch_combining( owner );
664 assert( pRec->op( memory_model::memory_order_relaxed ) == req_Response );
667 // There is another combiner, wait while it executes our request
668 if ( !wait_for_combining( pRec )) {
669 // The thread becomes a combiner
670 lock_guard l( m_Mutex, std::adopt_lock_t());
672 // The record pRec can be excluded from publication list. Re-publish it
675 batch_combining( owner );
676 assert( pRec->op( memory_model::memory_order_relaxed ) == req_Response );
681 template <class Container>
682 void combining( Container& owner )
684 // The thread is a combiner
685 assert( !m_Mutex.try_lock());
687 unsigned int const nCurAge = m_nCount.fetch_add( 1, memory_model::memory_order_relaxed ) + 1;
689 unsigned int nEmptyPassCount = 0;
690 unsigned int nUsefulPassCount = 0;
691 for ( unsigned int nPass = 0; nPass < m_nCombinePassCount; ++nPass ) {
692 if ( combining_pass( owner, nCurAge ))
694 else if ( ++nEmptyPassCount > nUsefulPassCount )
698 m_Stat.onCombining();
699 if ( ( nCurAge & m_nCompactFactor ) == 0 )
700 compact_list( nCurAge );
703 template <class Container>
704 bool combining_pass( Container& owner, unsigned int nCurAge )
706 publication_record* p = m_pHead;
707 bool bOpDone = false;
709 switch ( p->nState.load( memory_model::memory_order_acquire )) {
711 if ( p->op() >= req_Operation ) {
712 p->nAge.store( nCurAge, memory_model::memory_order_relaxed );
713 owner.fc_apply( static_cast<publication_record_type*>( p ));
714 operation_done( *p );
719 // Only m_pHead can be inactive in the publication list
720 assert( p == m_pHead );
723 // Such record will be removed on compacting phase
726 /// ??? That is impossible
729 p = p->pNext.load( memory_model::memory_order_acquire );
734 template <class Container>
735 void batch_combining( Container& owner )
737 // The thread is a combiner
738 assert( !m_Mutex.try_lock());
740 unsigned int const nCurAge = m_nCount.fetch_add( 1, memory_model::memory_order_relaxed ) + 1;
742 for ( unsigned int nPass = 0; nPass < m_nCombinePassCount; ++nPass )
743 owner.fc_process( begin(), end());
745 combining_pass( owner, nCurAge );
746 m_Stat.onCombining();
747 if ( ( nCurAge & m_nCompactFactor ) == 0 )
748 compact_list( nCurAge );
751 bool wait_for_combining( publication_record_type * pRec )
753 m_waitStrategy.prepare( *pRec );
754 m_Stat.onPassiveWait();
756 while ( pRec->op( memory_model::memory_order_acquire ) != req_Response ) {
757 // The record can be excluded from publication list. Reinsert it
760 m_Stat.onPassiveWaitIteration();
762 // Wait while operation processing
763 if ( m_waitStrategy.wait( *this, *pRec ))
764 m_Stat.onWakeupByNotifying();
766 if ( m_Mutex.try_lock()) {
767 if ( pRec->op( memory_model::memory_order_acquire ) == req_Response ) {
771 // Wake up a pending threads
772 m_waitStrategy.wakeup( *this );
773 m_Stat.onPassiveWaitWakeup();
777 // The thread becomes a combiner
778 m_Stat.onPassiveToCombiner();
785 void compact_list( unsigned int nCurAge )
787 // Compacts publication list
788 // This function is called only by combiner thread
791 publication_record * pPrev = m_pHead;
792 for ( publication_record * p = pPrev->pNext.load( memory_model::memory_order_acquire ); p; ) {
793 switch ( p->nState.load( memory_model::memory_order_relaxed )) {
795 if ( p->nAge.load( memory_model::memory_order_relaxed ) + m_nCompactFactor < nCurAge )
797 publication_record * pNext = p->pNext.load( memory_model::memory_order_relaxed );
798 if ( pPrev->pNext.compare_exchange_strong( p, pNext,
799 memory_model::memory_order_acquire, atomics::memory_order_relaxed ))
801 p->nState.store( inactive, memory_model::memory_order_release );
803 m_Stat.onDeactivatePubRecord();
810 publication_record * pNext = p->pNext.load( memory_model::memory_order_acquire );
811 if ( cds_likely( pPrev->pNext.compare_exchange_strong( p, pNext, memory_model::memory_order_acquire, atomics::memory_order_relaxed ))) {
816 // CAS can be failed only in beginning of list
817 assert( pPrev == m_pHead );
822 p = p->pNext.load( memory_model::memory_order_acquire );
825 // Iterate over allocated list to find removed records
826 pPrev = m_pAllocatedHead;
827 for ( publication_record * p = pPrev->pNextAllocated.load( memory_model::memory_order_acquire ); p; ) {
828 if ( p->nState.load( memory_model::memory_order_relaxed ) == removed ) {
829 publication_record * pNext = p->pNextAllocated.load( memory_model::memory_order_relaxed );
830 if ( pPrev->pNextAllocated.compare_exchange_strong( p, pNext, memory_model::memory_order_acquire, atomics::memory_order_relaxed )) {
831 free_publication_record( static_cast<publication_record_type *>( p ));
838 p = p->pNextAllocated.load( memory_model::memory_order_relaxed );
841 m_Stat.onCompactPublicationList();
850 template <typename PubRecord>
851 void fc_apply( PubRecord * )
856 template <typename Iterator>
857 void fc_process( Iterator, Iterator )
864 } // namespace flat_combining
865 }} // namespace cds::algo
867 #endif // #ifndef CDSLIB_ALGO_FLAT_COMBINING_KERNEL_H