3 #ifndef CDSLIB_INTRUSIVE_MSPRIORITY_QUEUE_H
4 #define CDSLIB_INTRUSIVE_MSPRIORITY_QUEUE_H
6 #include <mutex> // std::unique_lock
7 #include <cds/intrusive/details/base.h>
8 #include <cds/sync/spinlock.h>
9 #include <cds/os/thread.h>
10 #include <cds/details/bit_reverse_counter.h>
11 #include <cds/intrusive/options.h>
12 #include <cds/opt/buffer.h>
13 #include <cds/opt/compare.h>
14 #include <cds/details/bounded_container.h>
16 namespace cds { namespace intrusive {
18 /// MSPriorityQueue related definitions
19 /** @ingroup cds_intrusive_helper
21 namespace mspriority_queue {
23 /// MSPriorityQueue statistics
24 template <typename Counter = cds::atomicity::event_counter>
26 typedef Counter event_counter ; ///< Event counter type
28 event_counter m_nPushCount ; ///< Count of success push operation
29 event_counter m_nPopCount ; ///< Count of success pop operation
30 event_counter m_nPushFailCount ; ///< Count of failed ("the queue is full") push operation
31 event_counter m_nPopFailCount ; ///< Count of failed ("the queue is empty") pop operation
32 event_counter m_nPushHeapifySwapCount ; ///< Count of item swapping when heapifying in push
33 event_counter m_nPopHeapifySwapCount ; ///< Count of item swapping when heapifying in pop
36 void onPushSuccess() { ++m_nPushCount ;}
37 void onPopSuccess() { ++m_nPopCount ;}
38 void onPushFailed() { ++m_nPushFailCount ;}
39 void onPopFailed() { ++m_nPopFailCount ;}
40 void onPushHeapifySwap() { ++m_nPushHeapifySwapCount ;}
41 void onPopHeapifySwap() { ++m_nPopHeapifySwapCount ;}
45 /// MSPriorityQueue empty statistics
48 void onPushSuccess() {}
49 void onPopSuccess() {}
50 void onPushFailed() {}
52 void onPushHeapifySwap() {}
53 void onPopHeapifySwap() {}
57 /// MSPriorityQueue traits
61 The storage type for the heap array. Default is \p cds::opt::v::dynamic_buffer.
63 You may specify any type of buffer's value since at instantiation time
64 the \p buffer::rebind member metafunction is called to change type
65 of values stored in the buffer.
67 typedef opt::v::dynamic_buffer<void *> buffer;
69 /// Priority compare functor
71 No default functor is provided. If the option is not specified, the \p less is used.
73 typedef opt::none compare;
75 /// Specifies binary predicate used for priority comparing.
77 Default is \p std::less<T>.
79 typedef opt::none less;
81 /// Type of mutual-exclusion lock
82 typedef cds::sync::spin lock_type;
85 typedef backoff::yield back_off;
87 /// Internal statistics
89 Possible types: \p mspriority_queue::empty_stat (the default, no overhead), \p mspriority_queue::stat
90 or any other with interface like \p %mspriority_queue::stat
92 typedef empty_stat stat;
95 /// Metafunction converting option list to traits
98 - \p opt::buffer - the buffer type for heap array. Possible type are: \p opt::v::static_buffer, \p opt::v::dynamic_buffer.
99 Default is \p %opt::v::dynamic_buffer.
100 You may specify any type of values for the buffer since at instantiation time
101 the \p buffer::rebind member metafunction is called to change the type of values stored in the buffer.
102 - \p opt::compare - priority compare functor. No default functor is provided.
103 If the option is not specified, the \p opt::less is used.
104 - \p opt::less - specifies binary predicate used for priority compare. Default is \p std::less<T>.
105 - \p opt::lock_type - lock type. Default is \p cds::sync::spin
106 - \p opt::back_off - back-off strategy. Default is \p cds::backoff::yield
107 - \p opt::stat - internal statistics. Available types: \p mspriority_queue::stat, \p mspriority_queue::empty_stat (the default, no overhead)
109 template <typename... Options>
111 # ifdef CDS_DOXYGEN_INVOKED
112 typedef implementation_defined type ; ///< Metafunction result
114 typedef typename cds::opt::make_options<
115 typename cds::opt::find_type_traits< traits, Options... >::type
121 } // namespace mspriority_queue
123 /// Michael & Scott array-based lock-based concurrent priority queue heap
124 /** @ingroup cds_intrusive_priority_queue
126 - [1996] G.Hunt, M.Michael, S. Parthasarathy, M.Scott
127 "An efficient algorithm for concurrent priority queue heaps"
129 \p %MSPriorityQueue augments the standard array-based heap data structure with
130 a mutual-exclusion lock on the heap's size and locks on each node in the heap.
131 Each node also has a tag that indicates whether
132 it is empty, valid, or in a transient state due to an update to the heap
133 by an inserting thread.
134 The algorithm allows concurrent insertions and deletions in opposite directions,
135 without risking deadlock and without the need for special server threads.
136 It also uses a "bit-reversal" technique to scatter accesses across the fringe
137 of the tree to reduce contention.
138 On large heaps the algorithm achieves significant performance improvements
139 over serialized single-lock algorithm, for various insertion/deletion
140 workloads. For small heaps it still performs well, but not as well as
141 single-lock algorithm.
144 - \p T - type to be stored in the queue. The priority is a part of \p T type.
145 - \p Traits - type traits. See \p mspriority_queue::traits for explanation.
146 It is possible to declare option-based queue with \p cds::container::mspriority_queue::make_traits
147 metafunction instead of \p Traits template argument.
149 template <typename T, class Traits = mspriority_queue::traits >
150 class MSPriorityQueue: public cds::bounded_container
153 typedef T value_type ; ///< Value type stored in the queue
154 typedef Traits traits ; ///< Traits template parameter
156 # ifdef CDS_DOXYGEN_INVOKED
157 typedef implementation_defined key_comparator ; ///< priority comparing functor based on opt::compare and opt::less option setter.
159 typedef typename opt::details::make_comparator< value_type, traits >::type key_comparator;
162 typedef typename traits::lock_type lock_type ; ///< heap's size lock type
163 typedef typename traits::back_off back_off ; ///< Back-off strategy
164 typedef typename traits::stat stat ; ///< internal statistics type
168 typedef cds::OS::ThreadId tag_type;
179 value_type * m_pVal ; ///< A value pointer
180 tag_type volatile m_nTag ; ///< A tag
181 mutable lock_type m_Lock ; ///< Node-level lock
183 /// Creates empty node
186 , m_nTag( tag_type(Empty) )
204 typedef typename traits::buffer::template rebind<node>::other buffer_type ; ///< Heap array buffer type
207 typedef cds::bitop::bit_reverse_counter<> item_counter_type;
208 typedef typename item_counter_type::counter_type counter_type;
212 item_counter_type m_ItemCounter ; ///< Item counter
213 mutable lock_type m_Lock ; ///< Heap's size lock
214 buffer_type m_Heap ; ///< Heap array
215 stat m_Stat ; ///< internal statistics accumulator
218 /// Constructs empty priority queue
220 For cds::opt::v::static_buffer the \p nCapacity parameter is ignored.
222 MSPriorityQueue( size_t nCapacity )
223 : m_Heap( nCapacity )
226 /// Clears priority queue and destructs the object
232 /// Inserts a item into priority queue
234 If the priority queue is full, the function returns \p false,
235 no item has been added.
236 Otherwise, the function inserts the pointer to \p val into the heap
239 The function does not make a copy of \p val.
241 bool push( value_type& val )
243 tag_type const curId = cds::OS::get_current_thread_id();
245 // Insert new item at bottom of the heap
247 if ( m_ItemCounter.value() >= capacity() ) {
250 m_Stat.onPushFailed();
254 counter_type i = m_ItemCounter.inc();
255 assert( i < m_Heap.capacity() );
257 node& refNode = m_Heap[i];
260 refNode.m_pVal = &val;
261 refNode.m_nTag = curId;
264 // Move item towards top of the heap while it has higher priority than parent
265 heapify_after_push( i, curId );
267 m_Stat.onPushSuccess();
271 /// Extracts item with high priority
273 If the priority queue is empty, the function returns \p nullptr.
274 Otherwise, it returns the item extracted.
279 if ( m_ItemCounter.value() == 0 ) {
282 m_Stat.onPopFailed();
285 counter_type nBottom = m_ItemCounter.reversed_value();
287 // Since m_Heap[0] is not used, capacity() returns m_Heap.capacity() - 1
288 // Consequently, "<=" is here
289 assert( nBottom <= capacity() );
290 assert( nBottom > 0 );
292 node& refBottom = m_Heap[ nBottom ];
295 refBottom.m_nTag = tag_type(Empty);
296 value_type * pVal = refBottom.m_pVal;
297 refBottom.m_pVal = nullptr;
300 node& refTop = m_Heap[ 1 ];
302 if ( refTop.m_nTag == tag_type(Empty) ) {
305 m_Stat.onPopSuccess();
309 std::swap( refTop.m_pVal, pVal );
310 refTop.m_nTag = tag_type( Available );
312 assert( nBottom > 1 );
314 // refTop will be unlocked inside heapify_after_pop
315 heapify_after_pop( 1, &refTop );
317 m_Stat.onPopSuccess();
321 /// Clears the queue (not atomic)
323 This function is no atomic, but thread-safe
327 clear_with( []( value_type const& /*src*/ ) {} );
330 /// Clears the queue (not atomic)
332 This function is no atomic, but thread-safe.
334 For each item removed the functor \p f is called.
335 \p Func interface is:
339 void operator()( value_type& item );
342 A lambda function or a function pointer can be used as \p f.
344 template <typename Func>
345 void clear_with( Func f )
348 value_type * pVal = pop();
354 /// Checks is the priority queue is empty
360 /// Checks if the priority queue is full
363 return size() == capacity();
366 /// Returns current size of priority queue
369 std::unique_lock<lock_type> l( m_Lock );
370 size_t nSize = (size_t) m_ItemCounter.value();
374 /// Return capacity of the priority queue
375 size_t capacity() const
377 // m_Heap[0] is not used
378 return m_Heap.capacity() - 1;
381 /// Returns const reference to internal statistics
382 stat const& statistics() const
390 void heapify_after_push( counter_type i, tag_type curId )
395 // Move item towards top of the heap while it has higher priority than parent
397 bool bProgress = true;
398 counter_type nParent = i / 2;
399 node& refParent = m_Heap[nParent];
401 node& refItem = m_Heap[i];
404 if ( refParent.m_nTag == tag_type(Available) && refItem.m_nTag == curId ) {
405 if ( cmp( *refItem.m_pVal, *refParent.m_pVal ) > 0 ) {
406 std::swap( refItem.m_nTag, refParent.m_nTag );
407 std::swap( refItem.m_pVal, refParent.m_pVal );
408 m_Stat.onPushHeapifySwap();
412 refItem.m_nTag = tag_type(Available);
416 else if ( refParent.m_nTag == tag_type(Empty) )
418 else if ( refItem.m_nTag != curId )
433 node& refItem = m_Heap[i];
435 if ( refItem.m_nTag == curId )
436 refItem.m_nTag = tag_type(Available);
441 void heapify_after_pop( counter_type nParent, node * pParent )
445 while ( nParent < m_Heap.capacity() / 2 ) {
446 counter_type nLeft = nParent * 2;
447 counter_type nRight = nLeft + 1;
448 node& refLeft = m_Heap[nLeft];
449 node& refRight = m_Heap[nRight];
455 if ( refLeft.m_nTag == tag_type(Empty) ) {
460 else if ( refRight.m_nTag == tag_type(Empty) || cmp( *refLeft.m_pVal, *refRight.m_pVal ) > 0 ) {
471 // If child has higher priority that parent then swap
473 if ( cmp( *pChild->m_pVal, *pParent->m_pVal ) > 0 ) {
474 std::swap( pParent->m_nTag, pChild->m_nTag );
475 std::swap( pParent->m_pVal, pChild->m_pVal );
477 m_Stat.onPopHeapifySwap();
491 }} // namespace cds::intrusive
493 #endif // #ifndef CDSLIB_INTRUSIVE_MSPRIORITY_QUEUE_H