3 #ifndef __CDS_INTRUSIVE_MSPRIORITY_QUEUE_H
4 #define __CDS_INTRUSIVE_MSPRIORITY_QUEUE_H
6 #include <cds/intrusive/base.h>
7 #include <cds/lock/spinlock.h>
8 #include <cds/os/thread.h>
9 #include <cds/details/bit_reverse_counter.h>
10 #include <cds/intrusive/options.h>
11 #include <cds/opt/buffer.h>
12 #include <cds/opt/compare.h>
13 #include <cds/details/bounded_container.h>
16 namespace cds { namespace intrusive {
18 /// MSPriorityQueue related definitions
19 /** @ingroup cds_intrusive_helper
21 namespace mspriority_queue {
23 /// MSPriorityQueue statistics
24 template <typename Counter = cds::atomicity::event_counter>
26 typedef Counter event_counter ; ///< Event counter type
28 event_counter m_nPushCount ; ///< Count of success push operation
29 event_counter m_nPopCount ; ///< Count of success pop operation
30 event_counter m_nPushFailCount ; ///< Count of failed ("the queue is full") push operation
31 event_counter m_nPopFailCount ; ///< Count of failed ("the queue is empty") pop operation
32 event_counter m_nPushHeapifySwapCount ; ///< Count of item swapping when heapifying in push
33 event_counter m_nPopHeapifySwapCount ; ///< Count of item swapping when heapifying in pop
36 void onPushSuccess() { ++m_nPushCount ;}
37 void onPopSuccess() { ++m_nPopCount ;}
38 void onPushFailed() { ++m_nPushFailCount ;}
39 void onPopFailed() { ++m_nPopFailCount ;}
40 void onPushHeapifySwap() { ++m_nPushHeapifySwapCount ;}
41 void onPopHeapifySwap() { ++m_nPopHeapifySwapCount ;}
45 /// MSPriorityQueue empty statistics
48 void onPushSuccess() {}
49 void onPopSuccess() {}
50 void onPushFailed() {}
52 void onPushHeapifySwap() {}
53 void onPopHeapifySwap() {}
57 /// Type traits for MSPriorityQueue
61 The storage type for the heap array. Default is cds::opt::v::dynamic_buffer.
63 You may specify any type of buffer's value since at instantiation time
64 the \p buffer::rebind member metafunction is called to change type
65 of values stored in the buffer.
67 typedef opt::v::dynamic_buffer<void *> buffer;
69 /// Priority compare functor
71 No default functor is provided. If the option is not specified, the \p less is used.
73 typedef opt::none compare;
75 /// specifies binary predicate used for priority comparing.
77 Default is \p std::less<T>.
79 typedef opt::none less;
81 /// Type of mutual-exclusion lock
82 typedef lock::Spin lock_type;
85 typedef backoff::yield back_off;
87 /// Internal statistics
89 Possible types: mspriority_queue::empty_stat (the default), mspriority_queue::stat
90 or any other with interface like \p %mspriority_queue::stat
92 typedef empty_stat stat;
95 /// Metafunction converting option list to traits
97 This is a wrapper for <tt> cds::opt::make_options< type_traits, Options...> </tt>
99 See \ref MSPriorityQueue, \ref type_traits, \ref cds::opt::make_options.
101 template <CDS_DECL_OPTIONS7>
103 # ifdef CDS_DOXYGEN_INVOKED
104 typedef implementation_defined type ; ///< Metafunction result
106 typedef typename cds::opt::make_options<
107 typename cds::opt::find_type_traits< type_traits, CDS_OPTIONS7 >::type
113 } // namespace mspriority_queue
115 /// Michael & Scott array-based lock-based concurrent priority queue heap
116 /** @ingroup cds_intrusive_priority_queue
118 - [1996] G.Hunt, M.Michael, S. Parthasarathy, M.Scott
119 "An efficient algorithm for concurrent priority queue heaps"
121 \p %MSPriorityQueue augments the standard array-based heap data structure with
122 a mutual-exclusion lock on the heap's size and locks on each node in the heap.
123 Each node also has a tag that indicates whether
124 it is empty, valid, or in a transient state due to an update to the heap
125 by an inserting thread.
126 The algorithm allows concurrent insertions and deletions in opposite directions,
127 without risking deadlock and without the need for special server threads.
128 It also uses a "bit-reversal" technique to scatter accesses across the fringe
129 of the tree to reduce contention.
130 On large heaps the algorithm achieves significant performance improvements
131 over serialized single-lock algorithm, for various insertion/deletion
132 workloads. For small heaps it still performs well, but not as well as
133 single-lock algorithm.
136 - \p T - type to be stored in the list. The priority is a part of \p T type.
137 - \p Traits - type traits. See mspriority_queue::type_traits for explanation.
139 It is possible to declare option-based queue with cds::container::mspriority_queue::make_traits
140 metafunction instead of \p Traits template argument.
141 Template argument list \p Options of \p %cds::container::mspriority_queue::make_traits metafunction are:
142 - opt::buffer - the buffer type for heap array. Possible type are: opt::v::static_buffer, opt::v::dynamic_buffer.
143 Default is \p %opt::v::dynamic_buffer.
144 You may specify any type of values for the buffer since at instantiation time
145 the \p buffer::rebind member metafunction is called to change the type of values stored in the buffer.
146 - opt::compare - priority compare functor. No default functor is provided.
147 If the option is not specified, the opt::less is used.
148 - opt::less - specifies binary predicate used for priority compare. Default is \p std::less<T>.
149 - opt::lock_type - lock type. Default is cds::lock::Spin.
150 - opt::back_off - back-off strategy. Default is cds::backoff::yield
151 - opt::stat - internal statistics. Available types: mspriority_queue::stat, mspriority_queue::empty_stat (the default)
153 template <typename T, class Traits>
154 class MSPriorityQueue: public cds::bounded_container
157 typedef T value_type ; ///< Value type stored in the queue
158 typedef Traits traits ; ///< Traits template parameter
160 # ifdef CDS_DOXYGEN_INVOKED
161 typedef implementation_defined key_comparator ; ///< priority comparing functor based on opt::compare and opt::less option setter.
163 typedef typename opt::details::make_comparator< value_type, traits >::type key_comparator;
166 typedef typename traits::lock_type lock_type ; ///< heap's size lock type
167 typedef typename traits::back_off back_off ; ///< Back-off strategy
168 typedef typename traits::stat stat ; ///< internal statistics type
172 typedef cds::OS::ThreadId tag_type;
183 value_type * m_pVal ; ///< A value pointer
184 tag_type volatile m_nTag ; ///< A tag
185 mutable lock_type m_Lock ; ///< Node-level lock
187 /// Creates empty node
190 , m_nTag( tag_type(Empty) )
209 # ifndef CDS_CXX11_LAMBDA_SUPPORT
212 void operator()( value_type const& ) const
219 typedef typename traits::buffer::template rebind<node>::other buffer_type ; ///< Heap array buffer type
222 typedef cds::bitop::bit_reverse_counter<> item_counter_type;
223 typedef typename item_counter_type::counter_type counter_type;
227 item_counter_type m_ItemCounter ; ///< Item counter
228 mutable lock_type m_Lock ; ///< Heap's size lock
229 buffer_type m_Heap ; ///< Heap array
230 stat m_Stat ; ///< internal statistics accumulator
233 /// Constructs empty priority queue
235 For cds::opt::v::static_buffer the \p nCapacity parameter is ignored.
237 MSPriorityQueue( size_t nCapacity )
238 : m_Heap( nCapacity )
241 /// Clears priority queue and destructs the object
247 /// Inserts a item into priority queue
249 If the priority queue is full, the function returns \p false,
250 no item has been added.
251 Otherwise, the function inserts the copy of \p val into the heap
254 The function use copy constructor to create new heap item from \p val.
256 bool push( value_type& val )
258 tag_type const curId = cds::OS::getCurrentThreadId();
260 // Insert new item at bottom of the heap
262 if ( m_ItemCounter.value() >= capacity() ) {
265 m_Stat.onPushFailed();
269 counter_type i = m_ItemCounter.inc();
270 assert( i < m_Heap.capacity() );
272 node& refNode = m_Heap[i];
275 refNode.m_pVal = &val;
276 refNode.m_nTag = curId;
279 // Move item towards top of the heap while it has higher priority than parent
280 heapify_after_push( i, curId );
282 m_Stat.onPushSuccess();
286 /// Extracts item with high priority
288 If the priority queue is empty, the function returns \p nullptr.
289 Otherwise, it returns the item extracted.
291 The item returned may be disposed immediately.
296 if ( m_ItemCounter.value() == 0 ) {
299 m_Stat.onPopFailed();
302 counter_type nBottom = m_ItemCounter.reversed_value();
304 // Since m_Heap[0] is not used, capacity() returns m_Heap.capacity() - 1
305 // Consequently, "<=" is here
306 assert( nBottom <= capacity() );
307 assert( nBottom > 0 );
309 node& refBottom = m_Heap[ nBottom ];
312 refBottom.m_nTag = tag_type(Empty);
313 value_type * pVal = refBottom.m_pVal;
314 refBottom.m_pVal = nullptr;
317 node& refTop = m_Heap[ 1 ];
319 if ( refTop.m_nTag == tag_type(Empty) ) {
322 m_Stat.onPopSuccess();
326 std::swap( refTop.m_pVal, pVal );
327 refTop.m_nTag = tag_type( Available );
329 assert( nBottom > 1 );
331 // refTop will be unlocked inside heapify_after_pop
332 heapify_after_pop( 1, &refTop );
334 m_Stat.onPopSuccess();
338 /// Clears the queue (not atomic)
340 This function is no atomic, but thread-safe
344 # ifdef CDS_CXX11_LAMBDA_SUPPORT
345 clear_with( []( value_type const& src ) {} );
347 clear_with( empty_cleaner() );
351 /// Clears the queue (not atomic)
353 This function is no atomic, but thread-safe.
355 For each item removed the functor \p f is called.
356 \p Func interface is:
360 void operator()( value_type& item );
363 A lambda function or a function pointer can be used as \p f.
365 template <typename Func>
366 void clear_with( Func f )
369 value_type * pVal = pop();
371 cds::unref(f)( *pVal );
375 /// Checks is the priority queue is empty
381 /// Checks if the priority queue is full
384 return size() == capacity();
387 /// Returns current size of priority queue
391 size_t nSize = (size_t) m_ItemCounter.value();
396 /// Return capacity of the priority queue
397 size_t capacity() const
399 // m_Heap[0] is not used
400 return m_Heap.capacity() - 1;
403 /// Returns const reference to internal statistics
404 stat const& statistics() const
412 void heapify_after_push( counter_type i, tag_type curId )
417 // Move item towards top of the heap while it has higher priority than parent
419 bool bProgress = true;
420 counter_type nParent = i / 2;
421 node& refParent = m_Heap[nParent];
423 node& refItem = m_Heap[i];
426 if ( refParent.m_nTag == tag_type(Available) && refItem.m_nTag == curId ) {
427 if ( cmp( *refItem.m_pVal, *refParent.m_pVal ) > 0 ) {
428 std::swap( refItem.m_nTag, refParent.m_nTag );
429 std::swap( refItem.m_pVal, refParent.m_pVal );
430 m_Stat.onPushHeapifySwap();
434 refItem.m_nTag = tag_type(Available);
438 else if ( refParent.m_nTag == tag_type(Empty) )
440 else if ( refItem.m_nTag != curId )
455 node& refItem = m_Heap[i];
457 if ( refItem.m_nTag == curId )
458 refItem.m_nTag = tag_type(Available);
463 void heapify_after_pop( counter_type nParent, node * pParent )
467 while ( nParent < m_Heap.capacity() / 2 ) {
468 counter_type nLeft = nParent * 2;
469 counter_type nRight = nLeft + 1;
470 node& refLeft = m_Heap[nLeft];
471 node& refRight = m_Heap[nRight];
477 if ( refLeft.m_nTag == tag_type(Empty) ) {
482 else if ( refRight.m_nTag == tag_type(Empty) || cmp( *refLeft.m_pVal, *refRight.m_pVal ) > 0 ) {
493 // If child has higher priority that parent then swap
495 if ( cmp( *pChild->m_pVal, *pParent->m_pVal ) > 0 ) {
496 std::swap( pParent->m_nTag, pChild->m_nTag );
497 std::swap( pParent->m_pVal, pChild->m_pVal );
499 m_Stat.onPopHeapifySwap();
513 }} // namespace cds::intrusive
515 #endif // #ifndef __CDS_INTRUSIVE_MSPRIORITY_QUEUE_H