2 This file is a part of libcds - Concurrent Data Structures library
4 (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2016
6 Source code repo: http://github.com/khizmax/libcds/
7 Download: http://sourceforge.net/projects/libcds/files/
9 Redistribution and use in source and binary forms, with or without
10 modification, are permitted provided that the following conditions are met:
12 * Redistributions of source code must retain the above copyright notice, this
13 list of conditions and the following disclaimer.
15 * Redistributions in binary form must reproduce the above copyright notice,
16 this list of conditions and the following disclaimer in the documentation
17 and/or other materials provided with the distribution.
19 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22 DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23 FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24 DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25 SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27 OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28 OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 #ifndef CDSLIB_ALGO_FLAT_COMBINING_H
32 #define CDSLIB_ALGO_FLAT_COMBINING_H
35 #include <cds/algo/atomic.h>
36 #include <cds/details/allocator.h>
37 #include <cds/algo/backoff_strategy.h>
38 #include <cds/sync/spinlock.h>
39 #include <cds/opt/options.h>
40 #include <cds/algo/int_algo.h>
41 #include <boost/thread/tss.hpp> // thread_specific_ptr
43 namespace cds { namespace algo {
45 /// @defgroup cds_flat_combining_intrusive Intrusive flat combining containers
46 /// @defgroup cds_flat_combining_container Non-intrusive flat combining containers
50 @anchor cds_flat_combining_description
51 Flat combining (FC) technique is invented by Hendler, Incze, Shavit and Tzafrir in their paper
52 [2010] <i>"Flat Combining and the Synchronization-Parallelism Tradeoff"</i>.
53 The technique converts a sequential data structure to its concurrent implementation.
54 A few structures are added to the sequential implementation: a <i>global lock</i>,
55 a <i>count</i> of the number of combining passes, and a pointer to the <i>head</i>
56 of a <i>publication list</i>. The publication list is a list of thread-local records
57 of a size proportional to the number of threads that are concurrently accessing the shared object.
59 Each thread \p t accessing the structure to perform an invocation of some method \p m
60 on the shared object executes the following sequence of steps:
62 <li>Write the invocation opcode and parameters (if any) of the method \p m to be applied
63 sequentially to the shared object in the <i>request</i> field of your thread local publication
64 record (there is no need to use a load-store memory barrier). The <i>request</i> field will later
65 be used to receive the response. If your thread local publication record is marked as active
66 continue to step 2, otherwise continue to step 5.</li>
67 <li>Check if the global lock is taken. If so (another thread is an active combiner), spin on the <i>request</i>
68 field waiting for a response to the invocation (one can add a yield at this point to allow other threads
69 on the same core to run). Once in a while while spinning check if the lock is still taken and that your
70 record is active. If your record is inactive proceed to step 5. Once the response is available,
71 reset the request field to null and return the response.</li>
72 <li>If the lock is not taken, attempt to acquire it and become a combiner. If you fail,
73 return to spinning in step 2.</li>
74 <li>Otherwise, you hold the lock and are a combiner.
76 <li>Increment the combining pass count by one.</li>
77 <li>Execute a \p fc_apply() by traversing the publication list from the head,
78 combining all nonnull method call invocations, setting the <i>age</i> of each of these records
79 to the current <i>count</i>, applying the combined method calls to the structure D, and returning
80 responses to all the invocations. This traversal is guaranteed to be wait-free.</li>
81 <li>If the <i>count</i> is such that a cleanup needs to be performed, traverse the publication
82 list from the <i>head</i>. Starting from the second item (we always leave the item pointed to
83 by the head in the list), remove from the publication list all records whose <i>age</i> is
84 much smaller than the current <i>count</i>. This is done by removing the node and marking it
86 <li>Release the lock.</li>
88 <li>If you have no thread local publication record allocate one, marked as active. If you already
89 have one marked as inactive, mark it as active. Execute a store-load memory barrier. Proceed to insert
90 the record into the list with a successful CAS to the <i>head</i>. Then proceed to step 1.</li>
93 As the test results show, the flat combining technique is suitable for non-intrusive containers
94 like stack, queue, deque. For intrusive concurrent containers the flat combining demonstrates
95 less impressive results.
97 \ref cds_flat_combining_container "List of FC-based containers" in libcds.
99 \ref cds_flat_combining_intrusive "List of intrusive FC-based containers" in libcds.
101 namespace flat_combining {
103 /// Special values of publication_record::nRequest
106 req_EmptyRecord, ///< Publication record is empty
107 req_Response, ///< Operation is done
109 req_Operation ///< First operation id for derived classes
112 /// publication_record state
114 inactive, ///< Record is inactive
115 active, ///< Record is active
116 removed ///< Record should be removed
119 /// Record of publication list
121 Each data structure based on flat combining contains a class derived from \p %publication_record
123 struct publication_record {
124 atomics::atomic<unsigned int> nRequest; ///< Request field (depends on data structure)
125 atomics::atomic<unsigned int> nState; ///< Record state: inactive, active, removed
126 atomics::atomic<unsigned int> nAge; ///< Age of the record
127 atomics::atomic<publication_record *> pNext; ///< Next record in publication list
128 void * pOwner; ///< [internal data] Pointer to \ref kernel object that manages the publication list
130 /// Initializes publication record
132 : nRequest( req_EmptyRecord )
139 /// Returns the value of \p nRequest field
140 unsigned int op() const
142 return nRequest.load( atomics::memory_order_relaxed );
145 /// Checks if the operation is done
148 return nRequest.load( atomics::memory_order_relaxed ) == req_Response;
152 /// Flat combining internal statistics
153 template <typename Counter = cds::atomicity::event_counter >
156 typedef Counter counter_type; ///< Event counter type
158 counter_type m_nOperationCount ; ///< How many operations have been performed
159 counter_type m_nCombiningCount ; ///< Combining call count
160 counter_type m_nCompactPublicationList; ///< Count of publication list compacting
161 counter_type m_nDeactivatePubRecord; ///< How many publication records were deactivated during compacting
162 counter_type m_nActivatePubRecord; ///< Count of publication record activating
163 counter_type m_nPubRecordCreated ; ///< Count of created publication records
164 counter_type m_nPubRecordDeteted ; ///< Count of deleted publication records
165 counter_type m_nAcquirePubRecCount; ///< Count of acquiring publication record
166 counter_type m_nReleasePubRecCount; ///< Count on releasing publication record
168 /// Returns current combining factor
170 Combining factor is how many operations perform in one combine pass:
171 <tt>combining_factor := m_nOperationCount / m_nCombiningCount</tt>
173 double combining_factor() const
175 return m_nCombiningCount.get() ? double( m_nOperationCount.get()) / m_nCombiningCount.get() : 0.0;
179 void onOperation() { ++m_nOperationCount; }
180 void onCombining() { ++m_nCombiningCount; }
181 void onCompactPublicationList() { ++m_nCompactPublicationList; }
182 void onDeactivatePubRecord() { ++m_nDeactivatePubRecord; }
183 void onActivatePubRecord() { ++m_nActivatePubRecord; }
184 void onCreatePubRecord() { ++m_nPubRecordCreated; }
185 void onDeletePubRecord() { ++m_nPubRecordDeteted; }
186 void onAcquirePubRecord() { ++m_nAcquirePubRecCount; }
187 void onReleasePubRecord() { ++m_nReleasePubRecCount; }
191 /// Flat combining dummy internal statistics
195 void onOperation() {}
196 void onCombining() {}
197 void onCompactPublicationList() {}
198 void onDeactivatePubRecord() {}
199 void onActivatePubRecord() {}
200 void onCreatePubRecord() {}
201 void onDeletePubRecord() {}
202 void onAcquirePubRecord() {}
203 void onReleasePubRecord() {}
207 /// Type traits of \ref kernel class
209 You can define different type traits for \ref kernel
210 by specifying your struct based on \p %traits
211 or by using \ref make_traits metafunction.
215 typedef cds::sync::spin lock_type; ///< Lock type
216 typedef cds::backoff::delay_of<2> back_off; ///< Back-off strategy
217 typedef CDS_DEFAULT_ALLOCATOR allocator; ///< Allocator used for TLS data (allocating publication_record derivatives)
218 typedef empty_stat stat; ///< Internal statistics
219 typedef opt::v::relaxed_ordering memory_model; ///< /// C++ memory ordering model
222 /// Metafunction converting option list to traits
225 - \p opt::lock_type - mutex type, default is \p cds::sync::spin
226 - \p opt::back_off - back-off strategy, defalt is \p cds::backoff::delay_of<2>
227 - \p opt::allocator - allocator type, default is \ref CDS_DEFAULT_ALLOCATOR
228 - \p opt::stat - internal statistics, possible type: \ref stat, \ref empty_stat (the default)
229 - \p opt::memory_model - C++ memory ordering model.
230 List of all available memory ordering see \p opt::memory_model.
231 Default is \p cds::opt::v::relaxed_ordering
233 template <typename... Options>
235 # ifdef CDS_DOXYGEN_INVOKED
236 typedef implementation_defined type ; ///< Metafunction result
238 typedef typename cds::opt::make_options<
239 typename cds::opt::find_type_traits< traits, Options... >::type
245 /// The kernel of flat combining
248 - \p PublicationRecord - a type derived from \ref publication_record
249 - \p Traits - a type traits of flat combining, default is \p flat_combining::traits.
250 \ref make_traits metafunction can be used to create type traits
252 The kernel object should be a member of a container class. The container cooperates with flat combining
253 kernel object. There are two ways to interact with the kernel:
254 - One-by-one processing the active records of the publication list. This mode provides by \p combine() function:
255 the container acquires its publication record by \p acquire_record(), fills its fields and calls
256 \p combine() function of its kernel object. If the current thread becomes a combiner, the kernel
257 calls \p fc_apply() function of the container for each active non-empty record. Then, the container
258 should release its publication record by \p release_record(). Only one pass through the publication
260 - Batch processing - \p batch_combine() function. It this mode the container obtains access
261 to entire publication list. This mode allows the container to perform an elimination, for example,
262 the stack can collide \p push() and \p pop() requests. The sequence of invocations is the following:
263 the container acquires its publication record by \p acquire_record(), fills its field and call
264 \p batch_combine() function of its kernel object. If the current thread becomes a combiner,
265 the kernel calls \p fc_process() function of the container passing two iterators pointing to
266 the begin and the end of publication list (see \ref iterator class). The iterators allow
267 multiple pass through active records of publication list. For each processed record the container
268 should call \p operation_done() function. On the end, the container should release
269 its record by \p release_record().
272 typename PublicationRecord
273 ,typename Traits = traits
278 typedef PublicationRecord publication_record_type; ///< publication record type
279 typedef Traits traits; ///< Type traits
280 typedef typename traits::lock_type global_lock_type; ///< Global lock type
281 typedef typename traits::back_off back_off; ///< back-off strategy type
282 typedef typename traits::allocator allocator; ///< Allocator type (used for allocating publication_record_type data)
283 typedef typename traits::stat stat; ///< Internal statistics
284 typedef typename traits::memory_model memory_model; ///< C++ memory model
288 typedef cds::details::Allocator< publication_record_type, allocator > cxx11_allocator; ///< internal helper cds::details::Allocator
289 typedef std::lock_guard<global_lock_type> lock_guard;
293 atomics::atomic<unsigned int> m_nCount; ///< Total count of combining passes. Used as an age.
294 publication_record_type * m_pHead; ///< Head of publication list
295 boost::thread_specific_ptr< publication_record_type > m_pThreadRec; ///< Thread-local publication record
296 mutable global_lock_type m_Mutex; ///< Global mutex
297 mutable stat m_Stat; ///< Internal statistics
298 unsigned int const m_nCompactFactor; ///< Publication list compacting factor (the list will be compacted through \p %m_nCompactFactor combining passes)
299 unsigned int const m_nCombinePassCount; ///< Number of combining passes
302 /// Initializes the object
306 Combiner pass count = 8
311 , m_pThreadRec( tls_cleanup )
312 , m_nCompactFactor( 64 - 1 ) // binary mask
313 , m_nCombinePassCount( 8 )
318 /// Initializes the object
320 unsigned int nCompactFactor ///< Publication list compacting factor (the list will be compacted through \p nCompactFactor combining passes)
321 ,unsigned int nCombinePassCount ///< Number of combining passes for combiner thread
325 , m_pThreadRec( tls_cleanup )
326 , m_nCompactFactor( (unsigned int)( cds::beans::ceil2( nCompactFactor ) - 1 )) // binary mask
327 , m_nCombinePassCount( nCombinePassCount )
332 /// Destroys the objects and mark all publication records as inactive
335 // mark all publication record as detached
336 for ( publication_record * p = m_pHead; p; ) {
339 publication_record * pRec = p;
340 p = p->pNext.load( memory_model::memory_order_relaxed );
341 if ( pRec->nState.load( memory_model::memory_order_acquire ) == removed )
342 free_publication_record( static_cast<publication_record_type *>( pRec ));
346 /// Gets publication list record for the current thread
348 If there is no publication record for the current thread
349 the function allocates it.
351 publication_record_type * acquire_record()
353 publication_record_type * pRec = m_pThreadRec.get();
355 // Allocate new publication record
356 pRec = cxx11_allocator().New();
357 pRec->pOwner = reinterpret_cast<void *>( this );
358 m_pThreadRec.reset( pRec );
359 m_Stat.onCreatePubRecord();
362 if ( pRec->nState.load( memory_model::memory_order_acquire ) != active )
365 assert( pRec->nRequest.load( memory_model::memory_order_relaxed ) == req_EmptyRecord );
367 m_Stat.onAcquirePubRecord();
371 /// Marks publication record for the current thread as empty
372 void release_record( publication_record_type * pRec )
374 assert( pRec->is_done() );
375 pRec->nRequest.store( req_EmptyRecord, memory_model::memory_order_release );
376 m_Stat.onReleasePubRecord();
379 /// Trying to execute operation \p nOpId
381 \p pRec is the publication record acquiring by \ref acquire_record earlier.
382 \p owner is a container that is owner of flat combining kernel object.
383 As a result the current thread can become a combiner or can wait for
384 another combiner performs \p pRec operation.
386 If the thread becomes a combiner, the kernel calls \p owner.fc_apply
387 for each active non-empty publication record.
389 template <class Container>
390 void combine( unsigned int nOpId, publication_record_type * pRec, Container& owner )
392 assert( nOpId >= req_Operation );
394 //assert( pRec->nState.load( memory_model::memory_order_relaxed ) == active );
395 pRec->nRequest.store( nOpId, memory_model::memory_order_release );
397 m_Stat.onOperation();
399 try_combining( owner, pRec );
402 /// Trying to execute operation \p nOpId in batch-combine mode
404 \p pRec is the publication record acquiring by \ref acquire_record earlier.
405 \p owner is a container that owns flat combining kernel object.
406 As a result the current thread can become a combiner or can wait for
407 another combiner performs \p pRec operation.
409 If the thread becomes a combiner, the kernel calls \p owner.fc_process
410 giving the container the full access over publication list. This function
411 is useful for an elimination technique if the container supports any kind of
412 that. The container can perform multiple pass through publication list.
414 \p owner.fc_process has two arguments - forward iterators on begin and end of
415 publication list, see \ref iterator class. For each processed record the container
416 should call \ref operation_done function to mark the record as processed.
418 On the end of \p %batch_combine the \ref combine function is called
419 to process rest of publication records.
421 template <class Container>
422 void batch_combine( unsigned int nOpId, publication_record_type * pRec, Container& owner )
424 assert( nOpId >= req_Operation );
426 //assert( pRec->nState.load( memory_model::memory_order_relaxed ) == active );
427 pRec->nRequest.store( nOpId, memory_model::memory_order_release );
429 m_Stat.onOperation();
431 try_batch_combining( owner, pRec );
434 /// Waits for end of combining
435 void wait_while_combining() const
437 lock_guard l( m_Mutex );
440 /// Marks \p rec as executed
442 This function should be called by container if batch_combine mode is used.
443 For usual combining (see \ref combine) this function is excess.
445 void operation_done( publication_record& rec )
447 rec.nRequest.store( req_Response, memory_model::memory_order_release );
450 /// Internal statistics
451 stat const& statistics() const
457 // For container classes based on flat combining
458 stat& internal_statistics() const
464 /// Returns the compact factor
465 unsigned int compact_factor() const
467 return m_nCompactFactor + 1;
470 /// Returns number of combining passes for combiner thread
471 unsigned int combine_pass_count() const
473 return m_nCombinePassCount;
477 /// Publication list iterator
479 Iterators are intended for batch processing by container's
480 \p fc_process function.
481 The iterator allows iterate through active publication list.
487 publication_record_type * m_pRec;
492 iterator( publication_record_type * pRec )
500 while ( m_pRec && (m_pRec->nState.load( memory_model::memory_order_acquire ) != active
501 || m_pRec->nRequest.load( memory_model::memory_order_relaxed) < req_Operation ))
503 m_pRec = static_cast<publication_record_type *>(m_pRec->pNext.load( memory_model::memory_order_acquire ));
509 /// Initializes an empty iterator object
515 iterator( iterator const& src )
516 : m_pRec( src.m_pRec )
520 iterator& operator++()
523 m_pRec = static_cast<publication_record_type *>( m_pRec->pNext.load( memory_model::memory_order_acquire ));
529 iterator operator++(int)
537 /// Dereference operator, can return \p nullptr
538 publication_record_type * operator ->()
543 /// Dereference operator, the iterator should not be an end iterator
544 publication_record_type& operator*()
550 /// Iterator equality
551 friend bool operator==( iterator it1, iterator it2 )
553 return it1.m_pRec == it2.m_pRec;
556 /// Iterator inequality
557 friend bool operator!=( iterator it1, iterator it2 )
559 return !( it1 == it2 );
563 /// Returns an iterator to the first active publication record
564 iterator begin() { return iterator(m_pHead); }
566 /// Returns an iterator to the end of publication list. Should not be dereferenced.
567 iterator end() { return iterator(); }
571 static void tls_cleanup( publication_record_type * pRec )
574 // pRec that is TLS data should be excluded from publication list
576 if ( pRec->pOwner ) {
578 pRec->nState.store( removed, memory_model::memory_order_release );
581 // kernel already deleted
582 free_publication_record( pRec );
587 static void free_publication_record( publication_record_type * pRec )
589 cxx11_allocator().Delete( pRec );
594 assert( m_pThreadRec.get() == nullptr );
595 publication_record_type * pRec = cxx11_allocator().New();
598 m_pThreadRec.reset( pRec );
599 m_Stat.onCreatePubRecord();
602 void publish( publication_record_type * pRec )
604 assert( pRec->nState.load( memory_model::memory_order_relaxed ) == inactive );
606 pRec->nAge.store( m_nCount.load(memory_model::memory_order_acquire), memory_model::memory_order_release );
607 pRec->nState.store( active, memory_model::memory_order_release );
609 // Insert record to publication list
610 if ( m_pHead != static_cast<publication_record *>(pRec) ) {
611 publication_record * p = m_pHead->pNext.load(memory_model::memory_order_relaxed);
612 if ( p != static_cast<publication_record *>( pRec )) {
615 // Failed CAS changes p
616 } while ( !m_pHead->pNext.compare_exchange_weak( p, static_cast<publication_record *>(pRec),
617 memory_model::memory_order_release, atomics::memory_order_relaxed ));
618 m_Stat.onActivatePubRecord();
623 void republish( publication_record_type * pRec )
625 if ( pRec->nState.load( memory_model::memory_order_relaxed ) != active ) {
626 // The record has been excluded from publication list. Reinsert it
631 template <class Container>
632 void try_combining( Container& owner, publication_record_type * pRec )
634 if ( m_Mutex.try_lock() ) {
635 // The thread becomes a combiner
636 lock_guard l( m_Mutex, std::adopt_lock_t() );
638 // The record pRec can be excluded from publication list. Re-publish it
642 assert( pRec->nRequest.load( memory_model::memory_order_relaxed ) == req_Response );
645 // There is another combiner, wait while it executes our request
646 if ( !wait_for_combining( pRec ) ) {
647 // The thread becomes a combiner
648 lock_guard l( m_Mutex, std::adopt_lock_t() );
650 // The record pRec can be excluded from publication list. Re-publish it
654 assert( pRec->nRequest.load( memory_model::memory_order_relaxed ) == req_Response );
659 template <class Container>
660 void try_batch_combining( Container& owner, publication_record_type * pRec )
662 if ( m_Mutex.try_lock() ) {
663 // The thread becomes a combiner
664 lock_guard l( m_Mutex, std::adopt_lock_t() );
666 // The record pRec can be excluded from publication list. Re-publish it
669 batch_combining( owner );
670 assert( pRec->nRequest.load( memory_model::memory_order_relaxed ) == req_Response );
673 // There is another combiner, wait while it executes our request
674 if ( !wait_for_combining( pRec ) ) {
675 // The thread becomes a combiner
676 lock_guard l( m_Mutex, std::adopt_lock_t() );
678 // The record pRec can be excluded from publication list. Re-publish it
681 batch_combining( owner );
682 assert( pRec->nRequest.load( memory_model::memory_order_relaxed ) == req_Response );
687 template <class Container>
688 void combining( Container& owner )
690 // The thread is a combiner
691 assert( !m_Mutex.try_lock() );
693 unsigned int const nCurAge = m_nCount.fetch_add( 1, memory_model::memory_order_release ) + 1;
695 for ( unsigned int nPass = 0; nPass < m_nCombinePassCount; ++nPass )
696 if ( !combining_pass( owner, nCurAge ))
699 m_Stat.onCombining();
700 if ( (nCurAge & m_nCompactFactor) == 0 )
701 compact_list( nCurAge );
704 template <class Container>
705 bool combining_pass( Container& owner, unsigned int nCurAge )
707 publication_record * pPrev = nullptr;
708 publication_record * p = m_pHead;
709 bool bOpDone = false;
711 switch ( p->nState.load( memory_model::memory_order_acquire )) {
713 if ( p->op() >= req_Operation ) {
714 p->nAge.store( nCurAge, memory_model::memory_order_release );
715 owner.fc_apply( static_cast<publication_record_type *>(p) );
716 operation_done( *p );
721 // Only m_pHead can be inactive in the publication list
722 assert( p == m_pHead );
725 // The record should be removed
726 p = unlink_and_delete_record( pPrev, p );
729 /// ??? That is impossible
733 p = p->pNext.load( memory_model::memory_order_acquire );
738 template <class Container>
739 void batch_combining( Container& owner )
741 // The thread is a combiner
742 assert( !m_Mutex.try_lock() );
744 unsigned int const nCurAge = m_nCount.fetch_add( 1, memory_model::memory_order_release ) + 1;
746 for ( unsigned int nPass = 0; nPass < m_nCombinePassCount; ++nPass )
747 owner.fc_process( begin(), end() );
749 combining_pass( owner, nCurAge );
750 m_Stat.onCombining();
751 if ( (nCurAge & m_nCompactFactor) == 0 )
752 compact_list( nCurAge );
755 bool wait_for_combining( publication_record_type * pRec )
758 while ( pRec->nRequest.load( memory_model::memory_order_acquire ) != req_Response ) {
760 // The record can be excluded from publication list. Reinsert it
765 if ( m_Mutex.try_lock() ) {
766 if ( pRec->nRequest.load( memory_model::memory_order_acquire ) == req_Response ) {
770 // The thread becomes a combiner
777 void compact_list( unsigned int const nCurAge )
779 // Thinning publication list
780 publication_record * pPrev = nullptr;
781 for ( publication_record * p = m_pHead; p; ) {
782 if ( p->nState.load( memory_model::memory_order_acquire ) == active
783 && p->nAge.load( memory_model::memory_order_acquire ) + m_nCompactFactor < nCurAge )
786 publication_record * pNext = p->pNext.load( memory_model::memory_order_acquire );
787 if ( pPrev->pNext.compare_exchange_strong( p, pNext,
788 memory_model::memory_order_release, atomics::memory_order_relaxed ))
790 p->nState.store( inactive, memory_model::memory_order_release );
792 m_Stat.onDeactivatePubRecord();
798 p = p->pNext.load( memory_model::memory_order_acquire );
801 m_Stat.onCompactPublicationList();
804 publication_record * unlink_and_delete_record( publication_record * pPrev, publication_record * p )
807 publication_record * pNext = p->pNext.load( memory_model::memory_order_acquire );
808 if ( pPrev->pNext.compare_exchange_strong( p, pNext,
809 memory_model::memory_order_release, atomics::memory_order_relaxed ))
811 free_publication_record( static_cast<publication_record_type *>( p ));
812 m_Stat.onDeletePubRecord();
817 m_pHead = static_cast<publication_record_type *>( p->pNext.load( memory_model::memory_order_acquire ));
818 free_publication_record( static_cast<publication_record_type *>( p ));
819 m_Stat.onDeletePubRecord();
830 template <typename PubRecord>
831 void fc_apply( PubRecord * )
836 template <typename Iterator>
837 void fc_process( Iterator, Iterator )
844 } // namespace flat_combining
845 }} // namespace cds::algo
847 #endif // #ifndef CDSLIB_ALGO_FLAT_COMBINING_H