2 This file is a part of libcds - Concurrent Data Structures library
4 (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2016
6 Source code repo: http://github.com/khizmax/libcds/
7 Download: http://sourceforge.net/projects/libcds/files/
9 Redistribution and use in source and binary forms, with or without
10 modification, are permitted provided that the following conditions are met:
12 * Redistributions of source code must retain the above copyright notice, this
13 list of conditions and the following disclaimer.
15 * Redistributions in binary form must reproduce the above copyright notice,
16 this list of conditions and the following disclaimer in the documentation
17 and/or other materials provided with the distribution.
19 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22 DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23 FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24 DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25 SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27 OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28 OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 #ifndef CDSLIB_INTRUSIVE_SEGMENTED_QUEUE_H
32 #define CDSLIB_INTRUSIVE_SEGMENTED_QUEUE_H
35 #include <cds/intrusive/details/base.h>
36 #include <cds/details/marked_ptr.h>
37 #include <cds/algo/int_algo.h>
38 #include <cds/sync/spinlock.h>
39 #include <cds/opt/permutation.h>
41 #include <boost/intrusive/slist.hpp>
43 #if CDS_COMPILER == CDS_COMPILER_MSVC
44 # pragma warning( push )
45 # pragma warning( disable: 4355 ) // warning C4355: 'this' : used in base member initializer list
48 namespace cds { namespace intrusive {
50 /// SegmentedQueue -related declarations
51 namespace segmented_queue {
53 /// SegmentedQueue internal statistics. May be used for debugging or profiling
54 template <typename Counter = cds::atomicity::event_counter >
57 typedef Counter counter_type; ///< Counter type
59 counter_type m_nPush; ///< Push count
60 counter_type m_nPushPopulated; ///< Number of attempts to push to populated (non-empty) cell
61 counter_type m_nPushContended; ///< Number of failed CAS when pushing
62 counter_type m_nPop; ///< Pop count
63 counter_type m_nPopEmpty; ///< Number of dequeuing from empty queue
64 counter_type m_nPopContended; ///< Number of failed CAS when popping
66 counter_type m_nCreateSegmentReq; ///< Number of request to create new segment
67 counter_type m_nDeleteSegmentReq; ///< Number to request to delete segment
68 counter_type m_nSegmentCreated; ///< Number of created segments
69 counter_type m_nSegmentDeleted; ///< Number of deleted segments
72 void onPush() { ++m_nPush; }
73 void onPushPopulated() { ++m_nPushPopulated; }
74 void onPushContended() { ++m_nPushContended; }
75 void onPop() { ++m_nPop; }
76 void onPopEmpty() { ++m_nPopEmpty; }
77 void onPopContended() { ++m_nPopContended; }
78 void onCreateSegmentReq() { ++m_nCreateSegmentReq; }
79 void onDeleteSegmentReq() { ++m_nDeleteSegmentReq; }
80 void onSegmentCreated() { ++m_nSegmentCreated; }
81 void onSegmentDeleted() { ++m_nSegmentDeleted; }
85 /// Dummy SegmentedQueue statistics, no overhead
88 void onPush() const {}
89 void onPushPopulated() const {}
90 void onPushContended() const {}
92 void onPopEmpty() const {}
93 void onPopContended() const {}
94 void onCreateSegmentReq() const {}
95 void onDeleteSegmentReq() const {}
96 void onSegmentCreated() const {}
97 void onSegmentDeleted() const {}
101 /// SegmentedQueue default traits
103 /// Element disposer that is called when the item to be dequeued. Default is opt::v::empty_disposer (no disposer)
104 typedef opt::v::empty_disposer disposer;
106 /// Item counter, default is atomicity::item_counter
108 The item counting is an essential part of segmented queue algorithm.
109 The \p empty() member function is based on checking <tt>size() == 0</tt>.
110 Therefore, dummy item counter like atomicity::empty_item_counter is not the proper counter.
112 typedef atomicity::item_counter item_counter;
114 /// Internal statistics, possible predefined types are \ref stat, \ref empty_stat (the default)
115 typedef segmented_queue::empty_stat stat;
117 /// Memory model, default is opt::v::relaxed_ordering. See cds::opt::memory_model for the full list of possible types
118 typedef opt::v::relaxed_ordering memory_model;
120 /// Alignment of critical data, default is cache line alignment. See cds::opt::alignment option specification
121 enum { alignment = opt::cache_line_alignment };
123 /// Padding of segment data, default is no special padding
125 The segment is just an array of atomic data pointers,
126 so, the high load leads to false sharing and performance degradation.
127 A padding of segment data can eliminate false sharing issue.
128 On the other hand, the padding leads to increase segment size.
130 enum { padding = opt::no_special_padding };
132 /// Segment allocator. Default is \ref CDS_DEFAULT_ALLOCATOR
133 typedef CDS_DEFAULT_ALLOCATOR allocator;
135 /// Lock type used to maintain an internal list of allocated segments
136 typedef cds::sync::spin lock_type;
138 /// Random \ref cds::opt::permutation_generator "permutation generator" for sequence [0, quasi_factor)
139 typedef cds::opt::v::random2_permutation<int> permutation_generator;
142 /// Metafunction converting option list to traits for SegmentedQueue
144 The metafunction can be useful if a few fields in \p segmented_queue::traits should be changed.
147 typedef cds::intrusive::segmented_queue::make_traits<
148 cds::opt::item_counter< cds::atomicity::item_counter >
149 >::type my_segmented_queue_traits;
151 This code creates \p %SegmentedQueue type traits with item counting feature,
152 all other \p %segmented_queue::traits members left unchanged.
155 - \p opt::disposer - the functor used to dispose removed items.
156 - \p opt::stat - internal statistics, possible type: \p segmented_queue::stat, \p segmented_queue::empty_stat (the default)
157 - \p opt::item_counter - item counting feature. Note that \p atomicity::empty_item_counetr is not suitable
159 - \p opt::memory_model - memory model, default is \p opt::v::relaxed_ordering.
160 See option description for the full list of possible models
161 - \p opt::alignment - the alignment for critical data, see option description for explanation
162 - \p opt::padding - the padding of segment data, default no special padding.
163 See \p traits::padding for explanation.
164 - \p opt::allocator - the allocator to be used for maintaining segments.
165 - \p opt::lock_type - a mutual exclusion lock type used to maintain internal list of allocated
166 segments. Default is \p cds::opt::Spin, \p std::mutex is also suitable.
167 - \p opt::permutation_generator - a random permutation generator for sequence [0, quasi_factor),
168 default is \p cds::opt::v::random2_permutation<int>
170 template <typename... Options>
172 # ifdef CDS_DOXYGEN_INVOKED
173 typedef implementation_defined type ; ///< Metafunction result
175 typedef typename cds::opt::make_options<
176 typename cds::opt::find_type_traits< traits, Options... >::type
181 } // namespace segmented_queue
184 /** @ingroup cds_intrusive_queue
186 The queue is based on work
187 - [2010] Afek, Korland, Yanovsky "Quasi-Linearizability: relaxed consistency for improved concurrency"
189 In this paper the authors offer a relaxed version of linearizability, so-called quasi-linearizability,
190 that preserves some of the intuition, provides a flexible way to control the level of relaxation
191 and supports th implementation of more concurrent and scalable data structure.
192 Intuitively, the linearizability requires each run to be equivalent in some sense to a serial run
193 of the algorithm. This equivalence to some serial run imposes strong synchronization requirements
194 that in many cases results in limited scalability and synchronization bottleneck.
196 The general idea is that the queue maintains a linked list of segments, each segment is an array of
197 nodes in the size of the quasi factor, and each node has a deleted boolean marker, which states
198 if it has been dequeued. Each producer iterates over last segment in the linked list in some random
199 permutation order. Whet it finds an empty cell it performs a CAS operation attempting to enqueue its
200 new element. In case the entire segment has been scanned and no available cell is found (implying
201 that the segment is full), then it attempts to add a new segment to the list.
203 The dequeue operation is similar: the consumer iterates over the first segment in the linked list
204 in some random permutation order. When it finds an item which has not yet been dequeued, it performs
205 CAS on its deleted marker in order to "delete" it, if succeeded this item is considered dequeued.
206 In case the entire segment was scanned and all the nodes have already been dequeued (implying that
207 the segment is empty), then it attempts to remove this segment from the linked list and starts
208 the same process on the next segment. If there is no next segment, the queue is considered empty.
210 Based on the fact that most of the time threads do not add or remove segments, most of the work
211 is done in parallel on different cells in the segments. This ensures a controlled contention
212 depending on the segment size, which is quasi factor.
214 The segmented queue is an <i>unfair</i> queue since it violates the strong FIFO order but no more than
215 quasi factor. This means that the consumer dequeues <i>any</i> item from the current first segment.
218 - \p GC - a garbage collector, possible types are cds::gc::HP, cds::gc::DHP
219 - \p T - the type of values stored in the queue
220 - \p Traits - queue type traits, default is \p segmented_queue::traits.
221 \p segmented_queue::make_traits metafunction can be used to construct the
224 The queue stores the pointers to enqueued items so no special node hooks are needed.
226 template <class GC, typename T, typename Traits = segmented_queue::traits >
230 typedef GC gc; ///< Garbage collector
231 typedef T value_type; ///< type of the value stored in the queue
232 typedef Traits traits; ///< Queue traits
234 typedef typename traits::disposer disposer ; ///< value disposer, called only in \p clear() when the element to be dequeued
235 typedef typename traits::allocator allocator; ///< Allocator maintaining the segments
236 typedef typename traits::memory_model memory_model; ///< Memory ordering. See cds::opt::memory_model option
237 typedef typename traits::item_counter item_counter; ///< Item counting policy, see cds::opt::item_counter option setter
238 typedef typename traits::stat stat; ///< Internal statistics policy
239 typedef typename traits::lock_type lock_type; ///< Type of mutex for maintaining an internal list of allocated segments.
240 typedef typename traits::permutation_generator permutation_generator; ///< Random permutation generator for sequence [0, quasi-factor)
242 static const size_t c_nHazardPtrCount = 2 ; ///< Count of hazard pointer required for the algorithm
246 // Segment cell. LSB is used as deleted mark
247 typedef cds::details::marked_ptr< value_type, 1 > regular_cell;
248 typedef atomics::atomic< regular_cell > atomic_cell;
249 typedef typename cds::opt::details::apply_padding< atomic_cell, traits::padding >::type cell;
252 struct segment: public boost::intrusive::slist_base_hook<>
254 cell * cells; // Cell array of size \ref m_nQuasiFactor
255 size_t version; // version tag (ABA prevention tag)
256 // cell array is placed here in one continuous memory block
258 // Initializes the segment
259 segment( size_t nCellCount )
260 // MSVC warning C4355: 'this': used in base member initializer list
261 : cells( reinterpret_cast< cell *>( this + 1 ))
269 void init( size_t nCellCount )
271 cell * pLastCell = cells + nCellCount;
272 for ( cell* pCell = cells; pCell < pLastCell; ++pCell )
273 pCell->data.store( regular_cell(), atomics::memory_order_relaxed );
274 atomics::atomic_thread_fence( memory_model::memory_order_release );
278 typedef typename opt::details::alignment_setter< atomics::atomic<segment *>, traits::alignment >::type aligned_segment_ptr;
285 typedef boost::intrusive::slist< segment, boost::intrusive::cache_last< true > > list_impl;
286 typedef std::unique_lock< lock_type > scoped_lock;
288 aligned_segment_ptr m_pHead;
289 aligned_segment_ptr m_pTail;
292 mutable lock_type m_Lock;
293 size_t const m_nQuasiFactor;
297 struct segment_disposer
299 void operator()( segment * pSegment )
301 assert( pSegment != nullptr );
302 free_segment( pSegment );
306 struct gc_segment_disposer
308 void operator()( segment * pSegment )
310 assert( pSegment != nullptr );
311 retire_segment( pSegment );
316 segment_list( size_t nQuasiFactor, stat& st )
319 , m_nQuasiFactor( nQuasiFactor )
322 assert( cds::beans::is_power2( nQuasiFactor ));
327 m_List.clear_and_dispose( gc_segment_disposer());
330 segment * head( typename gc::Guard& guard )
332 return guard.protect( m_pHead );
335 segment * tail( typename gc::Guard& guard )
337 return guard.protect( m_pTail );
341 bool populated( segment const& s ) const
343 // The lock should be held
344 cell const * pLastCell = s.cells + quasi_factor();
345 for ( cell const * pCell = s.cells; pCell < pLastCell; ++pCell ) {
346 if ( !pCell->data.load( memory_model::memory_order_relaxed ).all())
351 bool exhausted( segment const& s ) const
353 // The lock should be held
354 cell const * pLastCell = s.cells + quasi_factor();
355 for ( cell const * pCell = s.cells; pCell < pLastCell; ++pCell ) {
356 if ( !pCell->data.load( memory_model::memory_order_relaxed ).bits())
363 segment * create_tail( segment * pTail, typename gc::Guard& guard )
365 // pTail is guarded by GC
367 m_Stat.onCreateSegmentReq();
369 scoped_lock l( m_Lock );
371 if ( !m_List.empty() && ( pTail != &m_List.back() || get_version(pTail) != m_List.back().version )) {
372 m_pTail.store( &m_List.back(), memory_model::memory_order_relaxed );
374 return guard.assign( &m_List.back());
378 assert( m_List.empty() || populated( m_List.back()));
381 segment * pNew = allocate_segment();
382 m_Stat.onSegmentCreated();
385 m_pHead.store( pNew, memory_model::memory_order_release );
386 m_List.push_back( *pNew );
387 m_pTail.store( pNew, memory_model::memory_order_release );
388 return guard.assign( pNew );
391 segment * remove_head( segment * pHead, typename gc::Guard& guard )
393 // pHead is guarded by GC
394 m_Stat.onDeleteSegmentReq();
398 scoped_lock l( m_Lock );
400 if ( m_List.empty()) {
401 m_pTail.store( nullptr, memory_model::memory_order_relaxed );
402 m_pHead.store( nullptr, memory_model::memory_order_relaxed );
403 return guard.assign( nullptr );
406 if ( pHead != &m_List.front() || get_version(pHead) != m_List.front().version ) {
407 m_pHead.store( &m_List.front(), memory_model::memory_order_relaxed );
408 return guard.assign( &m_List.front());
412 assert( exhausted( m_List.front()));
416 if ( m_List.empty()) {
417 pRet = guard.assign( nullptr );
418 m_pTail.store( nullptr, memory_model::memory_order_relaxed );
421 pRet = guard.assign( &m_List.front());
422 m_pHead.store( pRet, memory_model::memory_order_release );
425 retire_segment( pHead );
426 m_Stat.onSegmentDeleted();
431 size_t quasi_factor() const
433 return m_nQuasiFactor;
437 typedef cds::details::Allocator< segment, allocator > segment_allocator;
439 static size_t get_version( segment * pSegment )
441 return pSegment ? pSegment->version : 0;
444 segment * allocate_segment()
446 return segment_allocator().NewBlock( sizeof(segment) + sizeof(cell) * m_nQuasiFactor, quasi_factor());
449 static void free_segment( segment * pSegment )
451 segment_allocator().Delete( pSegment );
454 static void retire_segment( segment * pSegment )
456 gc::template retire<segment_disposer>( pSegment );
462 segment_list m_SegmentList; ///< List of segments
464 item_counter m_ItemCounter; ///< Item counter
465 stat m_Stat; ///< Internal statistics
468 /// Initializes the empty queue
470 size_t nQuasiFactor ///< Quasi factor. If it is not a power of 2 it is rounded up to nearest power of 2. Minimum is 2.
472 : m_SegmentList( cds::beans::ceil2(nQuasiFactor), m_Stat )
474 static_assert( (!std::is_same< item_counter, cds::atomicity::empty_item_counter >::value),
475 "cds::atomicity::empty_item_counter is not supported for SegmentedQueue"
477 assert( m_SegmentList.quasi_factor() > 1 );
480 /// Clears the queue and deletes all internal data
486 /// Inserts a new element at last segment of the queue
487 bool enqueue( value_type& val )
489 // LSB is used as a flag in marked pointer
490 assert( (reinterpret_cast<uintptr_t>( &val ) & 1) == 0 );
492 typename gc::Guard segmentGuard;
493 segment * pTailSegment = m_SegmentList.tail( segmentGuard );
494 if ( !pTailSegment ) {
495 // no segments, create the new one
496 pTailSegment = m_SegmentList.create_tail( pTailSegment, segmentGuard );
497 assert( pTailSegment );
500 permutation_generator gen( quasi_factor());
502 // First, increment item counter.
503 // We sure that the item will be enqueued
504 // but if we increment the counter after inserting we can get a negative counter value
505 // if dequeuing occurs before incrementing (enqueue/dequeue race)
509 CDS_DEBUG_ONLY( size_t nLoopCount = 0);
511 typename permutation_generator::integer_type i = gen;
512 CDS_DEBUG_ONLY( ++nLoopCount );
513 if ( pTailSegment->cells[i].data.load(memory_model::memory_order_relaxed).all()) {
514 // Cell is not empty, go next
515 m_Stat.onPushPopulated();
518 // Empty cell found, try to enqueue here
519 regular_cell nullCell;
520 if ( pTailSegment->cells[i].data.compare_exchange_strong( nullCell, regular_cell( &val ),
521 memory_model::memory_order_release, atomics::memory_order_relaxed ))
527 assert( nullCell.ptr());
528 m_Stat.onPushContended();
530 } while ( gen.next());
532 assert( nLoopCount == quasi_factor());
534 // No available position, create a new segment
535 pTailSegment = m_SegmentList.create_tail( pTailSegment, segmentGuard );
537 // Get new permutation
542 /// Removes an element from first segment of the queue and returns it
544 If the queue is empty the function returns \p nullptr.
546 The disposer specified in \p Traits template argument is <b>not</b> called for returned item.
547 You should manually dispose the item:
550 void operator()( foo * p )
555 cds::intrusive::SegmentedQueue< cds::gc::HP, foo > theQueue;
559 foo * pItem = theQueue.dequeue();
563 // pItem is not longer needed and can be deleted
564 // Do it via gc::HP::retire
565 cds::gc::HP::template retire< my_disposer >( pItem );
568 value_type * dequeue()
570 typename gc::Guard itemGuard;
571 if ( do_dequeue( itemGuard )) {
572 value_type * pVal = itemGuard.template get<value_type>();
580 /// Synonym for \p enqueue(value_type&) member function
581 bool push( value_type& val )
583 return enqueue( val );
586 /// Synonym for \p dequeue() member function
592 /// Checks if the queue is empty
594 The original segmented queue algorithm does not allow to check emptiness accurately
595 because \p empty() is unlinearizable.
596 This function tests queue's emptiness checking <tt>size() == 0</tt>,
597 so, the item counting feature is an essential part of queue's algorithm.
606 The function repeatedly calls \p dequeue() until it returns \p nullptr.
607 The disposer specified in \p Traits template argument is called for each removed item.
611 clear_with( disposer());
616 The function repeatedly calls \p dequeue() until it returns \p nullptr.
617 \p Disposer is called for each removed item.
619 template <class Disposer>
620 void clear_with( Disposer )
622 typename gc::Guard itemGuard;
623 while ( do_dequeue( itemGuard )) {
624 assert( itemGuard.template get<value_type>());
625 gc::template retire<Disposer>( itemGuard.template get<value_type>());
630 /// Returns queue's item count
633 return m_ItemCounter.value();
636 /// Returns reference to internal statistics
638 The type of internal statistics is specified by \p Traits template argument.
640 const stat& statistics() const
645 /// Returns quasi factor, a power-of-two number
646 size_t quasi_factor() const
648 return m_SegmentList.quasi_factor();
653 bool do_dequeue( typename gc::Guard& itemGuard )
655 typename gc::Guard segmentGuard;
656 segment * pHeadSegment = m_SegmentList.head( segmentGuard );
658 permutation_generator gen( quasi_factor());
660 if ( !pHeadSegment ) {
666 bool bHadNullValue = false;
668 CDS_DEBUG_ONLY( size_t nLoopCount = 0 );
670 typename permutation_generator::integer_type i = gen;
671 CDS_DEBUG_ONLY( ++nLoopCount );
674 // In segmented queue the cell cannot be reused
675 // So no loop is needed here to protect the cell
676 item = pHeadSegment->cells[i].data.load( memory_model::memory_order_relaxed );
677 itemGuard.assign( item.ptr());
679 // Check if this cell is empty, which means an element
680 // can be enqueued to this cell in the future
682 bHadNullValue = true;
684 // If the item is not deleted yet
686 // Try to mark the cell as deleted
687 if ( pHeadSegment->cells[i].data.compare_exchange_strong( item, item | 1,
688 memory_model::memory_order_acquire, atomics::memory_order_relaxed ))
695 assert( item.bits());
696 m_Stat.onPopContended();
699 } while ( gen.next());
701 assert( nLoopCount == quasi_factor());
703 // scanning the entire segment without finding a candidate to dequeue
704 // If there was an empty cell, the queue is considered empty
705 if ( bHadNullValue ) {
710 // All nodes have been dequeued, we can safely remove the first segment
711 pHeadSegment = m_SegmentList.remove_head( pHeadSegment, segmentGuard );
713 // Get new permutation
719 }} // namespace cds::intrusive
721 #if CDS_COMPILER == CDS_COMPILER_MSVC
722 # pragma warning( pop )
725 #endif // #ifndef CDSLIB_INTRUSIVE_SEGMENTED_QUEUE_H