3 #ifndef CDSLIB_INTRUSIVE_MSPRIORITY_QUEUE_H
4 #define CDSLIB_INTRUSIVE_MSPRIORITY_QUEUE_H
6 #include <mutex> // std::unique_lock
7 #include <cds/intrusive/details/base.h>
8 #include <cds/sync/spinlock.h>
9 #include <cds/os/thread.h>
10 #include <cds/details/bit_reverse_counter.h>
11 #include <cds/intrusive/options.h>
12 #include <cds/opt/buffer.h>
13 #include <cds/opt/compare.h>
14 #include <cds/details/bounded_container.h>
16 namespace cds { namespace intrusive {
18 /// MSPriorityQueue related definitions
19 /** @ingroup cds_intrusive_helper
21 namespace mspriority_queue {
23 /// MSPriorityQueue statistics
24 template <typename Counter = cds::atomicity::event_counter>
26 typedef Counter event_counter ; ///< Event counter type
28 event_counter m_nPushCount ; ///< Count of success push operation
29 event_counter m_nPopCount ; ///< Count of success pop operation
30 event_counter m_nPushFailCount ; ///< Count of failed ("the queue is full") push operation
31 event_counter m_nPopFailCount ; ///< Count of failed ("the queue is empty") pop operation
32 event_counter m_nPushHeapifySwapCount ; ///< Count of item swapping when heapifying in push
33 event_counter m_nPopHeapifySwapCount ; ///< Count of item swapping when heapifying in pop
36 void onPushSuccess() { ++m_nPushCount ;}
37 void onPopSuccess() { ++m_nPopCount ;}
38 void onPushFailed() { ++m_nPushFailCount ;}
39 void onPopFailed() { ++m_nPopFailCount ;}
40 void onPushHeapifySwap() { ++m_nPushHeapifySwapCount ;}
41 void onPopHeapifySwap() { ++m_nPopHeapifySwapCount ;}
45 /// MSPriorityQueue empty statistics
48 void onPushSuccess() {}
49 void onPopSuccess() {}
50 void onPushFailed() {}
52 void onPushHeapifySwap() {}
53 void onPopHeapifySwap() {}
57 /// MSPriorityQueue traits
61 The storage type for the heap array. Default is \p cds::opt::v::dynamic_buffer.
63 You may specify any type of buffer's value since at instantiation time
64 the \p buffer::rebind member metafunction is called to change type
65 of values stored in the buffer.
67 typedef opt::v::dynamic_buffer<void *> buffer;
69 /// Priority compare functor
71 No default functor is provided. If the option is not specified, the \p less is used.
73 typedef opt::none compare;
75 /// Specifies binary predicate used for priority comparing.
77 Default is \p std::less<T>.
79 typedef opt::none less;
81 /// Type of mutual-exclusion lock
82 typedef cds::sync::spin lock_type;
85 typedef backoff::yield back_off;
87 /// Internal statistics
89 Possible types: \p mspriority_queue::empty_stat (the default, no overhead), \p mspriority_queue::stat
90 or any other with interface like \p %mspriority_queue::stat
92 typedef empty_stat stat;
95 /// Metafunction converting option list to traits
98 - \p opt::buffer - the buffer type for heap array. Possible type are: \p opt::v::static_buffer, \p opt::v::dynamic_buffer.
99 Default is \p %opt::v::dynamic_buffer.
100 You may specify any type of values for the buffer since at instantiation time
101 the \p buffer::rebind member metafunction is called to change the type of values stored in the buffer.
102 - \p opt::compare - priority compare functor. No default functor is provided.
103 If the option is not specified, the \p opt::less is used.
104 - \p opt::less - specifies binary predicate used for priority compare. Default is \p std::less<T>.
105 - \p opt::lock_type - lock type. Default is \p cds::sync::spin
106 - \p opt::back_off - back-off strategy. Default is \p cds::backoff::yield
107 - \p opt::stat - internal statistics. Available types: \p mspriority_queue::stat, \p mspriority_queue::empty_stat (the default, no overhead)
109 template <typename... Options>
111 # ifdef CDS_DOXYGEN_INVOKED
112 typedef implementation_defined type ; ///< Metafunction result
114 typedef typename cds::opt::make_options<
115 typename cds::opt::find_type_traits< traits, Options... >::type
121 } // namespace mspriority_queue
123 /// Michael & Scott array-based lock-based concurrent priority queue heap
124 /** @ingroup cds_intrusive_priority_queue
126 - [1996] G.Hunt, M.Michael, S. Parthasarathy, M.Scott
127 "An efficient algorithm for concurrent priority queue heaps"
129 \p %MSPriorityQueue augments the standard array-based heap data structure with
130 a mutual-exclusion lock on the heap's size and locks on each node in the heap.
131 Each node also has a tag that indicates whether
132 it is empty, valid, or in a transient state due to an update to the heap
133 by an inserting thread.
134 The algorithm allows concurrent insertions and deletions in opposite directions,
135 without risking deadlock and without the need for special server threads.
136 It also uses a "bit-reversal" technique to scatter accesses across the fringe
137 of the tree to reduce contention.
138 On large heaps the algorithm achieves significant performance improvements
139 over serialized single-lock algorithm, for various insertion/deletion
140 workloads. For small heaps it still performs well, but not as well as
141 single-lock algorithm.
144 - \p T - type to be stored in the queue. The priority is a part of \p T type.
145 - \p Traits - type traits. See \p mspriority_queue::traits for explanation.
146 It is possible to declare option-based queue with \p cds::container::mspriority_queue::make_traits
147 metafunction instead of \p Traits template argument.
149 template <typename T, class Traits = mspriority_queue::traits >
150 class MSPriorityQueue: public cds::bounded_container
153 typedef T value_type ; ///< Value type stored in the queue
154 typedef Traits traits ; ///< Traits template parameter
156 # ifdef CDS_DOXYGEN_INVOKED
157 typedef implementation_defined key_comparator ; ///< priority comparing functor based on opt::compare and opt::less option setter.
159 typedef typename opt::details::make_comparator< value_type, traits >::type key_comparator;
162 typedef typename traits::lock_type lock_type ; ///< heap's size lock type
163 typedef typename traits::back_off back_off ; ///< Back-off strategy
164 typedef typename traits::stat stat ; ///< internal statistics type
168 typedef cds::OS::ThreadId tag_type;
179 value_type * m_pVal ; ///< A value pointer
180 tag_type volatile m_nTag ; ///< A tag
181 mutable lock_type m_Lock ; ///< Node-level lock
183 /// Creates empty node
186 , m_nTag( tag_type(Empty) )
204 typedef typename traits::buffer::template rebind<node>::other buffer_type ; ///< Heap array buffer type
207 typedef cds::bitop::bit_reverse_counter<> item_counter_type;
208 typedef typename item_counter_type::counter_type counter_type;
212 item_counter_type m_ItemCounter ; ///< Item counter
213 mutable lock_type m_Lock ; ///< Heap's size lock
214 buffer_type m_Heap ; ///< Heap array
215 stat m_Stat ; ///< internal statistics accumulator
218 /// Constructs empty priority queue
220 For cds::opt::v::static_buffer the \p nCapacity parameter is ignored.
222 MSPriorityQueue( size_t nCapacity )
223 : m_Heap( nCapacity )
226 /// Clears priority queue and destructs the object
232 /// Inserts a item into priority queue
234 If the priority queue is full, the function returns \p false,
235 no item has been added.
236 Otherwise, the function inserts the copy of \p val into the heap
239 The function use copy constructor to create new heap item from \p val.
241 bool push( value_type& val )
243 tag_type const curId = cds::OS::get_current_thread_id();
245 // Insert new item at bottom of the heap
247 if ( m_ItemCounter.value() >= capacity() ) {
250 m_Stat.onPushFailed();
254 counter_type i = m_ItemCounter.inc();
255 assert( i < m_Heap.capacity() );
257 node& refNode = m_Heap[i];
260 refNode.m_pVal = &val;
261 refNode.m_nTag = curId;
264 // Move item towards top of the heap while it has higher priority than parent
265 heapify_after_push( i, curId );
267 m_Stat.onPushSuccess();
271 /// Extracts item with high priority
273 If the priority queue is empty, the function returns \p nullptr.
274 Otherwise, it returns the item extracted.
276 The item returned may be disposed immediately.
281 if ( m_ItemCounter.value() == 0 ) {
284 m_Stat.onPopFailed();
287 counter_type nBottom = m_ItemCounter.reversed_value();
289 // Since m_Heap[0] is not used, capacity() returns m_Heap.capacity() - 1
290 // Consequently, "<=" is here
291 assert( nBottom <= capacity() );
292 assert( nBottom > 0 );
294 node& refBottom = m_Heap[ nBottom ];
297 refBottom.m_nTag = tag_type(Empty);
298 value_type * pVal = refBottom.m_pVal;
299 refBottom.m_pVal = nullptr;
302 node& refTop = m_Heap[ 1 ];
304 if ( refTop.m_nTag == tag_type(Empty) ) {
307 m_Stat.onPopSuccess();
311 std::swap( refTop.m_pVal, pVal );
312 refTop.m_nTag = tag_type( Available );
314 assert( nBottom > 1 );
316 // refTop will be unlocked inside heapify_after_pop
317 heapify_after_pop( 1, &refTop );
319 m_Stat.onPopSuccess();
323 /// Clears the queue (not atomic)
325 This function is no atomic, but thread-safe
329 clear_with( []( value_type const& /*src*/ ) {} );
332 /// Clears the queue (not atomic)
334 This function is no atomic, but thread-safe.
336 For each item removed the functor \p f is called.
337 \p Func interface is:
341 void operator()( value_type& item );
344 A lambda function or a function pointer can be used as \p f.
346 template <typename Func>
347 void clear_with( Func f )
350 value_type * pVal = pop();
356 /// Checks is the priority queue is empty
362 /// Checks if the priority queue is full
365 return size() == capacity();
368 /// Returns current size of priority queue
371 std::unique_lock<lock_type> l( m_Lock );
372 size_t nSize = (size_t) m_ItemCounter.value();
376 /// Return capacity of the priority queue
377 size_t capacity() const
379 // m_Heap[0] is not used
380 return m_Heap.capacity() - 1;
383 /// Returns const reference to internal statistics
384 stat const& statistics() const
392 void heapify_after_push( counter_type i, tag_type curId )
397 // Move item towards top of the heap while it has higher priority than parent
399 bool bProgress = true;
400 counter_type nParent = i / 2;
401 node& refParent = m_Heap[nParent];
403 node& refItem = m_Heap[i];
406 if ( refParent.m_nTag == tag_type(Available) && refItem.m_nTag == curId ) {
407 if ( cmp( *refItem.m_pVal, *refParent.m_pVal ) > 0 ) {
408 std::swap( refItem.m_nTag, refParent.m_nTag );
409 std::swap( refItem.m_pVal, refParent.m_pVal );
410 m_Stat.onPushHeapifySwap();
414 refItem.m_nTag = tag_type(Available);
418 else if ( refParent.m_nTag == tag_type(Empty) )
420 else if ( refItem.m_nTag != curId )
435 node& refItem = m_Heap[i];
437 if ( refItem.m_nTag == curId )
438 refItem.m_nTag = tag_type(Available);
443 void heapify_after_pop( counter_type nParent, node * pParent )
447 while ( nParent < m_Heap.capacity() / 2 ) {
448 counter_type nLeft = nParent * 2;
449 counter_type nRight = nLeft + 1;
450 node& refLeft = m_Heap[nLeft];
451 node& refRight = m_Heap[nRight];
457 if ( refLeft.m_nTag == tag_type(Empty) ) {
462 else if ( refRight.m_nTag == tag_type(Empty) || cmp( *refLeft.m_pVal, *refRight.m_pVal ) > 0 ) {
473 // If child has higher priority that parent then swap
475 if ( cmp( *pChild->m_pVal, *pParent->m_pVal ) > 0 ) {
476 std::swap( pParent->m_nTag, pChild->m_nTag );
477 std::swap( pParent->m_pVal, pChild->m_pVal );
479 m_Stat.onPopHeapifySwap();
493 }} // namespace cds::intrusive
495 #endif // #ifndef CDSLIB_INTRUSIVE_MSPRIORITY_QUEUE_H