3 #ifndef __CDS_INTRUSIVE_SEGMENTED_QUEUE_H
4 #define __CDS_INTRUSIVE_SEGMENTED_QUEUE_H
7 #include <cds/intrusive/details/base.h>
8 #include <cds/details/marked_ptr.h>
9 #include <cds/algo/int_algo.h>
10 #include <cds/lock/spinlock.h>
11 #include <cds/opt/permutation.h>
13 #include <boost/intrusive/slist.hpp>
15 #if CDS_COMPILER == CDS_COMPILER_MSVC
16 # pragma warning( push )
17 # pragma warning( disable: 4355 ) // warning C4355: 'this' : used in base member initializer list
20 namespace cds { namespace intrusive {
22 /// SegmentedQueue -related declarations
23 namespace segmented_queue {
25 /// SegmentedQueue internal statistics. May be used for debugging or profiling
26 template <typename Counter = cds::atomicity::event_counter >
29 typedef Counter counter_type; ///< Counter type
31 counter_type m_nPush; ///< Push count
32 counter_type m_nPushPopulated; ///< Number of attempts to push to populated (non-empty) cell
33 counter_type m_nPushContended; ///< Number of failed CAS when pushing
34 counter_type m_nPop; ///< Pop count
35 counter_type m_nPopEmpty; ///< Number of dequeuing from empty queue
36 counter_type m_nPopContended; ///< Number of failed CAS when popping
38 counter_type m_nCreateSegmentReq; ///< Number of request to create new segment
39 counter_type m_nDeleteSegmentReq; ///< Number to request to delete segment
40 counter_type m_nSegmentCreated; ///< Number of created segments
41 counter_type m_nSegmentDeleted; ///< Number of deleted segments
44 void onPush() { ++m_nPush; }
45 void onPushPopulated() { ++m_nPushPopulated; }
46 void onPushContended() { ++m_nPushContended; }
47 void onPop() { ++m_nPop; }
48 void onPopEmpty() { ++m_nPopEmpty; }
49 void onPopContended() { ++m_nPopContended; }
50 void onCreateSegmentReq() { ++m_nCreateSegmentReq; }
51 void onDeleteSegmentReq() { ++m_nDeleteSegmentReq; }
52 void onSegmentCreated() { ++m_nSegmentCreated; }
53 void onSegmentDeleted() { ++m_nSegmentDeleted; }
57 /// Dummy SegmentedQueue statistics, no overhead
60 void onPush() const {}
61 void onPushPopulated() const {}
62 void onPushContended() const {}
64 void onPopEmpty() const {}
65 void onPopContended() const {}
66 void onCreateSegmentReq() const {}
67 void onDeleteSegmentReq() const {}
68 void onSegmentCreated() const {}
69 void onSegmentDeleted() const {}
73 /// SegmentedQueue default traits
75 /// Element disposer that is called when the item to be dequeued. Default is opt::v::empty_disposer (no disposer)
76 typedef opt::v::empty_disposer disposer;
78 /// Item counter, default is atomicity::item_counter
80 The item counting is an essential part of segmented queue algorithm.
81 The \p empty() member function is based on checking <tt>size() == 0</tt>.
82 Therefore, dummy item counter like atomicity::empty_item_counter is not the proper counter.
84 typedef atomicity::item_counter item_counter;
86 /// Internal statistics, possible predefined types are \ref stat, \ref empty_stat (the default)
87 typedef segmented_queue::empty_stat stat;
89 /// Memory model, default is opt::v::relaxed_ordering. See cds::opt::memory_model for the full list of possible types
90 typedef opt::v::relaxed_ordering memory_model;
92 /// Alignment of critical data, default is cache line alignment. See cds::opt::alignment option specification
93 enum { alignment = opt::cache_line_alignment };
95 /// Segment allocator. Default is \ref CDS_DEFAULT_ALLOCATOR
96 typedef CDS_DEFAULT_ALLOCATOR allocator;
98 /// Lock type used to maintain an internal list of allocated segments
99 typedef cds::lock::Spin lock_type;
101 /// Random \ref cds::opt::permutation_generator "permutation generator" for sequence [0, quasi_factor)
102 typedef cds::opt::v::random2_permutation<int> permutation_generator;
105 /// Metafunction converting option list to traits for SegmentedQueue
107 The metafunction can be useful if a few fields in \p segmented_queue::traits should be changed.
110 typedef cds::intrusive::segmented_queue::make_traits<
111 cds::opt::item_counter< cds::atomicity::item_counter >
112 >::type my_segmented_queue_traits;
114 This code creates \p %SegmentedQueue type traits with item counting feature,
115 all other \p %segmented_queue::traits members left unchanged.
118 - \p opt::disposer - the functor used for dispose removed items.
119 - \p opt::stat - internal statistics, possible type: \p segmented_queue::stat, \p segmented_queue::empty_stat (the default)
120 - \p opt::item_counter - item counting feature. Note that \p atomicity::empty_item_counetr is not suitable
122 - \p opt::memory_model - memory model, default is \p opt::v::relaxed_ordering.
123 See option description for the full list of possible models
124 - \p opt::alignment - the alignment for critical data, see option description for explanation
125 - \p opt::allocator - the allocator to be used for maintaining segments.
126 - \p opt::lock_type - a mutual exclusion lock type used to maintain internal list of allocated
127 segments. Default is \p cds::opt::Spin, \p std::mutex is also suitable.
128 - \p opt::permutation_generator - a random permutation generator for sequence [0, quasi_factor),
129 default is \p cds::opt::v::random2_permutation<int>
131 template <typename... Options>
133 # ifdef CDS_DOXYGEN_INVOKED
134 typedef implementation_defined type ; ///< Metafunction result
136 typedef typename cds::opt::make_options<
137 typename cds::opt::find_type_traits< traits, Options... >::type
142 } // namespace segmented_queue
145 /** @ingroup cds_intrusive_queue
147 The queue is based on work
148 - [2010] Afek, Korland, Yanovsky "Quasi-Linearizability: relaxed consistency for improved concurrency"
150 In this paper the authors offer a relaxed version of linearizability, so-called quasi-linearizability,
151 that preserves some of the intuition, provides a flexible way to control the level of relaxation
152 and supports th implementation of more concurrent and scalable data structure.
153 Intuitively, the linearizability requires each run to be equivalent in some sense to a serial run
154 of the algorithm. This equivalence to some serial run imposes strong synchronization requirements
155 that in many cases results in limited scalability and synchronization bottleneck.
157 The general idea is that the queue maintains a linked list of segments, each segment is an array of
158 nodes in the size of the quasi factor, and each node has a deleted boolean marker, which states
159 if it has been dequeued. Each producer iterates over last segment in the linked list in some random
160 permutation order. Whet it finds an empty cell it performs a CAS operation attempting to enqueue its
161 new element. In case the entire segment has been scanned and no available cell is found (implying
162 that the segment is full), then it attempts to add a new segment to the list.
164 The dequeue operation is similar: the consumer iterates over the first segment in the linked list
165 in some random permutation order. When it finds an item which has not yet been dequeued, it performs
166 CAS on its deleted marker in order to "delete" it, if succeeded this item is considered dequeued.
167 In case the entire segment was scanned and all the nodes have already been dequeued (implying that
168 the segment is empty), then it attempts to remove this segment from the linked list and starts
169 the same process on the next segment. If there is no next segment, the queue is considered empty.
171 Based on the fact that most of the time threads do not add or remove segments, most of the work
172 is done in parallel on different cells in the segments. This ensures a controlled contention
173 depending on the segment size, which is quasi factor.
175 The segmented queue is an <i>unfair</i> queue since it violates the strong FIFO order but no more than
176 quasi factor. This means that the consumer dequeues <i>any</i> item from the current first segment.
179 - \p GC - a garbage collector, possible types are cds::gc::HP, cds::gc::PTB
180 - \p T - the type of values stored in the queue
181 - \p Traits - queue type traits, default is \p segmented_queue::traits.
182 \p segmented_queue::make_traits metafunction can be used to construct the
185 The queue stores the pointers to enqueued items so no special node hooks are needed.
187 template <class GC, typename T, typename Traits = segmented_queue::traits >
191 typedef GC gc; ///< Garbage collector
192 typedef T value_type; ///< type of the value stored in the queue
193 typedef Traits traits; ///< Queue traits
195 typedef typename traits::disposer disposer ; ///< value disposer, called only in \p clear() when the element to be dequeued
196 typedef typename traits::allocator allocator; ///< Allocator maintaining the segments
197 typedef typename traits::memory_model memory_model; ///< Memory ordering. See cds::opt::memory_model option
198 typedef typename traits::item_counter item_counter; ///< Item counting policy, see cds::opt::item_counter option setter
199 typedef typename traits::stat stat; ///< Internal statistics policy
200 typedef typename traits::lock_type lock_type; ///< Type of mutex for maintaining an internal list of allocated segments.
201 typedef typename traits::permutation_generator permutation_generator; ///< Random permutation generator for sequence [0, quasi-factor)
203 static const size_t m_nHazardPtrCount = 2 ; ///< Count of hazard pointer required for the algorithm
207 // Segment cell. LSB is used as deleted mark
208 typedef cds::details::marked_ptr< value_type, 1 > cell;
211 struct segment: public boost::intrusive::slist_base_hook<>
213 atomics::atomic< cell > * cells; // Cell array of size \ref m_nQuasiFactor
214 size_t version; // version tag (ABA prevention tag)
215 // cell array is placed here in one continuous memory block
217 // Initializes the segment
218 segment( size_t nCellCount )
219 // MSVC warning C4355: 'this': used in base member initializer list
220 : cells( reinterpret_cast< atomics::atomic< cell > * >( this + 1 ))
226 void init( size_t nCellCount )
228 atomics::atomic< cell > * pLastCell = cells + nCellCount;
229 for ( atomics::atomic< cell > * pCell = cells; pCell < pLastCell; ++pCell )
230 pCell->store( cell(), atomics::memory_order_relaxed );
231 atomics::atomic_thread_fence( memory_model::memory_order_release );
238 typedef typename opt::details::alignment_setter< atomics::atomic<segment *>, traits::alignment >::type aligned_segment_ptr;
245 typedef boost::intrusive::slist< segment, boost::intrusive::cache_last< true > > list_impl;
246 typedef std::unique_lock< lock_type > scoped_lock;
248 aligned_segment_ptr m_pHead;
249 aligned_segment_ptr m_pTail;
252 mutable lock_type m_Lock;
253 size_t const m_nQuasiFactor;
257 struct segment_disposer
259 void operator()( segment * pSegment )
261 assert( pSegment != nullptr );
262 free_segment( pSegment );
266 struct gc_segment_disposer
268 void operator()( segment * pSegment )
270 assert( pSegment != nullptr );
271 retire_segment( pSegment );
276 segment_list( size_t nQuasiFactor, stat& st )
279 , m_nQuasiFactor( nQuasiFactor )
282 assert( cds::beans::is_power2( nQuasiFactor ));
287 m_List.clear_and_dispose( gc_segment_disposer() );
290 segment * head( typename gc::Guard& guard )
292 return guard.protect( m_pHead );
295 segment * tail( typename gc::Guard& guard )
297 return guard.protect( m_pTail );
301 bool populated( segment const& s ) const
303 // The lock should be held
304 atomics::atomic< cell > const * pLastCell = s.cells + quasi_factor();
305 for ( atomics::atomic< cell > const * pCell = s.cells; pCell < pLastCell; ++pCell ) {
306 if ( !pCell->load( memory_model::memory_order_relaxed ).all() )
311 bool exhausted( segment const& s ) const
313 // The lock should be held
314 atomics::atomic< cell > const * pLastCell = s.cells + quasi_factor();
315 for ( atomics::atomic< cell > const * pCell = s.cells; pCell < pLastCell; ++pCell ) {
316 if ( !pCell->load( memory_model::memory_order_relaxed ).bits() )
323 segment * create_tail( segment * pTail, typename gc::Guard& guard )
325 // pTail is guarded by GC
327 m_Stat.onCreateSegmentReq();
329 scoped_lock l( m_Lock );
331 if ( !m_List.empty() && ( pTail != &m_List.back() || get_version(pTail) != m_List.back().version )) {
332 m_pTail.store( &m_List.back(), memory_model::memory_order_relaxed );
334 return guard.assign( &m_List.back() );
337 assert( m_List.empty() || populated( m_List.back() ));
339 segment * pNew = allocate_segment();
340 m_Stat.onSegmentCreated();
342 if ( m_List.empty() )
343 m_pHead.store( pNew, memory_model::memory_order_relaxed );
344 m_List.push_back( *pNew );
345 m_pTail.store( pNew, memory_model::memory_order_release );
346 return guard.assign( pNew );
349 segment * remove_head( segment * pHead, typename gc::Guard& guard )
351 // pHead is guarded by GC
352 m_Stat.onDeleteSegmentReq();
356 scoped_lock l( m_Lock );
358 if ( m_List.empty() ) {
359 m_pTail.store( nullptr, memory_model::memory_order_relaxed );
360 m_pHead.store( nullptr, memory_model::memory_order_relaxed );
361 return guard.assign( nullptr );
364 if ( pHead != &m_List.front() || get_version(pHead) != m_List.front().version ) {
365 m_pHead.store( &m_List.front(), memory_model::memory_order_relaxed );
366 return guard.assign( &m_List.front() );
369 assert( exhausted(m_List.front()) );
372 if ( m_List.empty() ) {
373 pRet = guard.assign( nullptr );
374 m_pTail.store( nullptr, memory_model::memory_order_relaxed );
377 pRet = guard.assign( &m_List.front() );
378 m_pHead.store( pRet, memory_model::memory_order_release );
381 retire_segment( pHead );
382 m_Stat.onSegmentDeleted();
387 size_t quasi_factor() const
389 return m_nQuasiFactor;
393 typedef cds::details::Allocator< segment, allocator > segment_allocator;
395 static size_t get_version( segment * pSegment )
397 return pSegment ? pSegment->version : 0;
400 segment * allocate_segment()
402 return segment_allocator().NewBlock( sizeof(segment) + sizeof(cell) * m_nQuasiFactor,
406 static void free_segment( segment * pSegment )
408 segment_allocator().Delete( pSegment );
411 static void retire_segment( segment * pSegment )
413 gc::template retire<segment_disposer>( pSegment );
419 segment_list m_SegmentList; ///< List of segments
421 item_counter m_ItemCounter; ///< Item counter
422 stat m_Stat; ///< Internal statistics
425 /// Initializes the empty queue
427 size_t nQuasiFactor ///< Quasi factor. If it is not a power of 2 it is rounded up to nearest power of 2. Minimum is 2.
429 : m_SegmentList( cds::beans::ceil2(nQuasiFactor), m_Stat )
431 static_assert( (!std::is_same< item_counter, cds::atomicity::empty_item_counter >::value),
432 "cds::atomicity::empty_item_counter is not supported for SegmentedQueue"
434 assert( m_SegmentList.quasi_factor() > 1 );
437 /// Clears the queue and deletes all internal data
443 /// Inserts a new element at last segment of the queue
444 bool enqueue( value_type& val )
446 // LSB is used as a flag in marked pointer
447 assert( (reinterpret_cast<uintptr_t>( &val ) & 1) == 0 );
449 typename gc::Guard segmentGuard;
450 segment * pTailSegment = m_SegmentList.tail( segmentGuard );
451 if ( !pTailSegment ) {
452 // no segments, create the new one
453 pTailSegment = m_SegmentList.create_tail( pTailSegment, segmentGuard );
454 assert( pTailSegment );
457 permutation_generator gen( quasi_factor() );
459 // First, increment item counter.
460 // We sure that the item will be enqueued
461 // but if we increment the counter after inserting we can get a negative counter value
462 // if dequeuing occurs before incrementing (enqueue/dequeue race)
466 CDS_DEBUG_ONLY( size_t nLoopCount = 0);
468 typename permutation_generator::integer_type i = gen;
469 CDS_DEBUG_ONLY( ++nLoopCount );
470 if ( pTailSegment->cells[i].load(memory_model::memory_order_relaxed).all() ) {
471 // Cell is not empty, go next
472 m_Stat.onPushPopulated();
475 // Empty cell found, try to enqueue here
477 if ( pTailSegment->cells[i].compare_exchange_strong( nullCell, cell( &val ),
478 memory_model::memory_order_release, atomics::memory_order_relaxed ))
484 assert( nullCell.ptr() );
485 m_Stat.onPushContended();
487 } while ( gen.next() );
489 assert( nLoopCount == quasi_factor());
491 // No available position, create a new segment
492 pTailSegment = m_SegmentList.create_tail( pTailSegment, segmentGuard );
494 // Get new permutation
499 /// Removes an element from first segment of the queue and returns it
501 If the queue is empty the function returns \p nullptr.
503 The disposer specified in \p Traits template argument is <b>not</b> called for returned item.
504 You should manually dispose the item:
507 void operator()( foo * p )
512 cds::intrusive::SegmentedQueue< cds::gc::HP, foo > theQueue;
516 foo * pItem = theQueue.dequeue();
520 // pItem is not longer needed and can be deleted
521 // Do it via gc::HP::retire
522 cds::gc::HP::template retire< my_disposer >( pItem );
525 value_type * dequeue()
527 typename gc::Guard itemGuard;
528 if ( do_dequeue( itemGuard )) {
529 value_type * pVal = itemGuard.template get<value_type>();
537 /// Synonym for \p enqueue(value_type&) member function
538 bool push( value_type& val )
540 return enqueue( val );
543 /// Synonym for \p dequeue() member function
549 /// Checks if the queue is empty
551 The original segmented queue algorithm does not allow to check emptiness accurately
552 because \p empty() is unlinearizable.
553 This function tests queue's emptiness checking <tt>size() == 0</tt>,
554 so, the item counting feature is an essential part of queue's algorithm.
563 The function repeatedly calls \ref dequeue until it returns \p nullptr.
564 The disposer specified in \p Traits template argument is called for each removed item.
568 clear_with( disposer() );
573 The function repeatedly calls \p dequeue() until it returns \p nullptr.
574 \p Disposer is called for each removed item.
576 template <class Disposer>
577 void clear_with( Disposer )
579 typename gc::Guard itemGuard;
580 while ( do_dequeue( itemGuard ) ) {
581 assert( itemGuard.template get<value_type>() );
582 gc::template retire<Disposer>( itemGuard.template get<value_type>() );
587 /// Returns queue's item count
590 return m_ItemCounter.value();
593 /// Returns reference to internal statistics
595 The type of internal statistics is specified by \p Traits template argument.
597 const stat& statistics() const
602 /// Returns quasi factor, a power-of-two number
603 size_t quasi_factor() const
605 return m_SegmentList.quasi_factor();
610 bool do_dequeue( typename gc::Guard& itemGuard )
612 typename gc::Guard segmentGuard;
613 segment * pHeadSegment = m_SegmentList.head( segmentGuard );
615 permutation_generator gen( quasi_factor() );
617 if ( !pHeadSegment ) {
623 bool bHadNullValue = false;
625 CDS_DEBUG_ONLY( size_t nLoopCount = 0 );
627 typename permutation_generator::integer_type i = gen;
628 CDS_DEBUG_ONLY( ++nLoopCount );
631 // In segmented queue the cell cannot be reused
632 // So no loop is needed here to protect the cell
633 item = pHeadSegment->cells[i].load( memory_model::memory_order_relaxed );
634 itemGuard.assign( item.ptr() );
636 // Check if this cell is empty, which means an element
637 // can be enqueued to this cell in the future
639 bHadNullValue = true;
641 // If the item is not deleted yet
642 if ( !item.bits() ) {
643 // Try to mark the cell as deleted
644 if ( pHeadSegment->cells[i].compare_exchange_strong( item, item | 1,
645 memory_model::memory_order_acquire, atomics::memory_order_relaxed ))
652 assert( item.bits() );
653 m_Stat.onPopContended();
656 } while ( gen.next() );
658 assert( nLoopCount == quasi_factor() );
660 // scanning the entire segment without finding a candidate to dequeue
661 // If there was an empty cell, the queue is considered empty
662 if ( bHadNullValue ) {
667 // All nodes have been dequeued, we can safely remove the first segment
668 pHeadSegment = m_SegmentList.remove_head( pHeadSegment, segmentGuard );
670 // Get new permutation
676 }} // namespace cds::intrusive
678 #if CDS_COMPILER == CDS_COMPILER_MSVC
679 # pragma warning( pop )
682 #endif // #ifndef __CDS_INTRUSIVE_SEGMENTED_QUEUE_H